From 92a401700a2c4a799279aa6127a9a1ca9d0f2642 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Wed, 28 Oct 2020 13:33:27 -0700 Subject: [PATCH 1/8] trace --- sourmash/logging.py | 27 ++++++++++++++++++++++++++- sourmash/sbt.py | 16 ++++++++++------ sourmash/sbtmh.py | 18 ++++++++++++++++-- 3 files changed, 52 insertions(+), 9 deletions(-) diff --git a/sourmash/logging.py b/sourmash/logging.py index 49c3dc26b3..097c0e194b 100644 --- a/sourmash/logging.py +++ b/sourmash/logging.py @@ -1,13 +1,27 @@ +import atexit +import os import sys from io import StringIO _quiet = False _debug = False def set_quiet(val, print_debug=False): - global _quiet, _debug + global _quiet, _debug, _trace _quiet = bool(val) _debug = bool(print_debug) +#_trace = True if "SOURMASH_TRACE" in os.environ else False +if "SOURMASH_TRACE" in os.environ: + _trace = open(os.environ["SOURMASH_TRACE"], "w") + + @atexit.register + def flush_and_close(): + global _trace + _trace.flush() + _trace.close() +else: + _trace = None + def print_results(s, *args, **kwargs): if _quiet: @@ -41,6 +55,17 @@ def debug(s, *args, **kwargs): sys.stderr.flush() +def trace(s, *args, **kwargs): + "Low level execution information (even more verbose than debug)" + if not _trace: + return + + print(s.format(*args, **kwargs), file=_trace, + end=kwargs.get('end', u'\n')) + if kwargs.get('flush'): + sys.stderr.flush() + + def error(s, *args, **kwargs): "A simple error logging function => stderr." print(u'\r\033[K', end=u'', file=sys.stderr) diff --git a/sourmash/sbt.py b/sourmash/sbt.py index 9cae16b693..8aea91e303 100644 --- a/sourmash/sbt.py +++ b/sourmash/sbt.py @@ -56,7 +56,7 @@ def search_transcript(node, seq, threshold): from .exceptions import IndexNotSupported from .sbt_storage import FSStorage, IPFSStorage, RedisStorage, ZipStorage -from .logging import error, notify, debug +from .logging import error, notify, debug, trace from .index import Index from .nodegraph import Nodegraph, extract_nodegraph_info, calc_expected_collisions @@ -317,6 +317,7 @@ def find(self, search_fn, *args, **kwargs): if node_p not in visited: visited.add(node_p) + trace("(TRAVERSAL) {0}", node_p) # apply search fn. If return false, truncate search. if search_fn(node_g, *args): @@ -327,9 +328,10 @@ def find(self, search_fn, *args, **kwargs): elif isinstance(node_g, Node): if kwargs.get('dfs', True): # defaults search to dfs for c in self.children(node_p): - queue.insert(0, c.pos) + if c.node: + queue.insert(0, c.pos) else: # bfs - queue.extend(c.pos for c in self.children(node_p)) + queue.extend(c.pos for c in self.children(node_p) if c.node) if unload_data: node_g.unload() @@ -1087,13 +1089,15 @@ def print(self): while stack: node_p = stack.pop() node_g = self._nodes.get(node_p, None) + if node_g is None: + node_g = self._leaves.get(node_p, None) if node_p not in visited and node_g is not None: visited.add(node_p) depth = int(math.floor(math.log(node_p + 1, self.d))) - print(" " * 4 * depth, node_g) + print(" " * 4 * depth, node_p, node_g) if isinstance(node_g, Node): - stack.extend(c.pos for c in self.children(node_p) - if c.pos not in visited) + stack.extend(reversed([c.pos for c in self.children(node_p) + if c.pos not in visited])) def __iter__(self): for i, node in self._nodes.items(): diff --git a/sourmash/sbtmh.py b/sourmash/sbtmh.py index c8b879118c..cce5bf2b2e 100644 --- a/sourmash/sbtmh.py +++ b/sourmash/sbtmh.py @@ -3,6 +3,7 @@ from .sbt import Leaf, SBT, GraphFactory from . import signature +from .logging import trace def load_sbt_index(filename, *, print_version_warning=True, cache_size=None): @@ -112,6 +113,8 @@ def search_minhashes(node, sig, threshold, results=None): else: # Node minhash comparison score = _max_jaccard_underneath_internal_node(node, sig_mh) + trace("(SCORE) {0}: {1}", node.name, score) + if results is not None: results[node.name] = score @@ -134,6 +137,8 @@ def search(self, node, sig, threshold, results=None): else: # internal object, not leaf. score = _max_jaccard_underneath_internal_node(node, sig_mh) + trace("(SCORE) {0}: {1}", node.name, score) + if results is not None: results[node.name] = score @@ -156,10 +161,15 @@ def search_minhashes_containment(node, sig, threshold, results=None, downsample= else: # Node or Leaf, Nodegraph by minhash comparison matches = node.data.matches(mh) + len_mh = max(len(mh), 1) + + score = float(matches) / len_mh + trace("(SCORE) {0}: {1}", node.name, score) + if results is not None: - results[node.name] = float(matches) / len(mh) + results[node.name] = score - if len(mh) and float(matches) / len(mh) >= threshold: + if len_mh and score >= threshold: return 1 return 0 @@ -170,7 +180,9 @@ def __init__(self): def search(self, node, query, threshold, results=None): mh = query.minhash + score = 0 if not len(mh): + trace("(SCORE) {0}: 0", node.name) return 0 if isinstance(node, SigLeaf): @@ -179,9 +191,11 @@ def search(self, node, query, threshold, results=None): matches = node.data.matches(mh) if not matches: + trace("(SCORE) {0}: 0", node.name) return 0 score = float(matches) / len(mh) + trace("(SCORE) {0}: {1}", node.name, score) if score < threshold: return 0 From e72ae7c2e80ac6fa5d77c2a620a85b45da2cae00 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Wed, 23 Sep 2020 17:24:44 -0700 Subject: [PATCH 2/8] SBT scaffolding broken impl with tests want union, not intersection; one-level rotation for subtrees use it in index CLI, fix bug for single-leaf SBT add a HLL impl, move alphabet stuff to encodings hll ffi expose cardinality, add_hash and add_sequence new ertl ml estimator for HLL implemented joint mle reorganize hll and estimators fix rust checks and add python tests for hll working on fixing scaffolding issues new hll methods working on partial saving add trace to logging --- sourmash/commands.py | 9 +- sourmash/nodegraph.py | 61 +++ sourmash/sbt.py | 365 +++++++++++++++++- src/core/src/sketch/hyperloglog/estimators.rs | 2 +- tests/test_sbt.py | 43 ++- 5 files changed, 472 insertions(+), 8 deletions(-) diff --git a/sourmash/commands.py b/sourmash/commands.py index 73b39adef8..4fbc3c7e19 100644 --- a/sourmash/commands.py +++ b/sourmash/commands.py @@ -12,6 +12,7 @@ from . import signature as sig from . import sourmash_args from .logging import notify, error, print_results, set_quiet +from .sbt import scaffold from .sbtmh import SearchMinHashesFindBest, SigLeaf from .sourmash_args import DEFAULT_LOAD_K, FileOutput @@ -327,10 +328,14 @@ def index(args): set_quiet(args.quiet) moltype = sourmash_args.calculate_moltype(args) + batch = False if args.append: tree = load_sbt_index(args.sbt_name) else: tree = create_sbt_index(args.bf_size, n_children=args.n_children) + batch = True + # TODO: set up storage here + # TODO: deal with factory and args.bf_size too? if args.sparseness < 0 or args.sparseness > 1.0: error('sparseness must be in range [0.0, 1.0].') @@ -375,7 +380,7 @@ def index(args): ss.minhash = ss.minhash.downsample(scaled=args.scaled) scaleds.add(ss.minhash.scaled) - tree.insert(ss) + tree.insert(ss, batch=batch) n += 1 if not ss: @@ -398,6 +403,8 @@ def index(args): error('nums = {}; scaleds = {}', repr(nums), repr(scaleds)) sys.exit(-1) + tree = tree.finish() + notify('') # did we load any!? diff --git a/sourmash/nodegraph.py b/sourmash/nodegraph.py index 8faa2eb874..884baa1e86 100644 --- a/sourmash/nodegraph.py +++ b/sourmash/nodegraph.py @@ -1,5 +1,7 @@ # -*- coding: UTF-8 -*- +from collections import namedtuple +import math from struct import pack, unpack import sys from tempfile import NamedTemporaryFile @@ -159,3 +161,62 @@ def calc_expected_collisions(graph, force=False, max_false_pos=.2): raise SystemExit(1) return fp_all + + +def optimal_size(num_kmers, mem_cap=None, fp_rate=None): + """ + Utility function for estimating optimal nodegraph args. + + - num_kmers: number of unique kmers [required] + - mem_cap: the allotted amount of memory [optional, conflicts with f] + - fp_rate: the desired false positive rate [optional, conflicts with M] + """ + if all((num_kmers is not None, mem_cap is not None, fp_rate is None)): + return estimate_optimal_with_K_and_M(num_kmers, mem_cap) + elif all((num_kmers is not None, mem_cap is None, fp_rate is not None)): + return estimate_optimal_with_K_and_f(num_kmers, fp_rate) + else: + raise TypeError("num_kmers and either mem_cap or fp_rate" + " must be defined.") + + +def estimate_optimal_with_K_and_M(num_kmers, mem_cap): + """ + Estimate optimal nodegraph args. + + - num_kmers: number of unique kmer + - mem_cap: the allotted amount of memory + """ + n_tables = math.log(2) * (mem_cap / float(num_kmers)) + int_n_tables = int(n_tables) + if int_n_tables == 0: + int_n_tables = 1 + ht_size = int(mem_cap / int_n_tables) + mem_cap = ht_size * int_n_tables + fp_rate = (1 - math.exp(-num_kmers / float(ht_size))) ** int_n_tables + res = namedtuple("result", ["num_htables", "htable_size", "mem_use", + "fp_rate"]) + return res(int_n_tables, ht_size, mem_cap, fp_rate) + + +def estimate_optimal_with_K_and_f(num_kmers, des_fp_rate): + """ + Estimate optimal memory. + + - num_kmers: the number of unique kmers + - des_fp_rate: the desired false positive rate + """ + n_tables = math.log(des_fp_rate, 0.5) + int_n_tables = int(n_tables) + if int_n_tables == 0: + int_n_tables = 1 + + ht_size = int(-num_kmers / ( + math.log(1 - des_fp_rate ** (1 / float(int_n_tables))))) + ht_size = max(ht_size, 1) + mem_cap = ht_size * int_n_tables + fp_rate = (1 - math.exp(-num_kmers / float(ht_size))) ** int_n_tables + + res = namedtuple("result", ["num_htables", "htable_size", "mem_use", + "fp_rate"]) + return res(int_n_tables, ht_size, mem_cap, fp_rate) diff --git a/sourmash/sbt.py b/sourmash/sbt.py index 8aea91e303..b419913d63 100644 --- a/sourmash/sbt.py +++ b/sourmash/sbt.py @@ -44,6 +44,7 @@ def search_transcript(node, seq, threshold): from collections import namedtuple, Counter from collections.abc import Mapping +import heapq from copy import copy import json @@ -58,7 +59,8 @@ def search_transcript(node, seq, threshold): from .sbt_storage import FSStorage, IPFSStorage, RedisStorage, ZipStorage from .logging import error, notify, debug, trace from .index import Index -from .nodegraph import Nodegraph, extract_nodegraph_info, calc_expected_collisions +from .nodegraph import Nodegraph, extract_nodegraph_info, calc_expected_collisions, optimal_size +from .hll import HLL STORAGES = { 'FSStorage': FSStorage, @@ -177,6 +179,7 @@ def __init__(self, factory, *, d=2, storage=None, cache_size=None): self._nodes = {} self._missing_nodes = set() self._leaves = {} + self._batched = [] self.d = d self.next_node = 0 self.storage = storage @@ -229,12 +232,31 @@ def new_node_pos(self, node): return self.next_node - def insert(self, signature): + def insert(self, signature, batch=False): "Add a new SourmashSignature in to the SBT." from .sbtmh import SigLeaf - + leaf = SigLeaf(signature.md5sum(), signature) - self.add_node(leaf) + + if batch: + self._batched.append(leaf) + else: + self.add_node(leaf) + + def finish(self): + # If not in batch mode, nothing to do + if not self._batched: + return self + + # TODO: check if SBT is empty, if it is not add current leaves to be + # scaffolded too + tree = scaffold(self._batched, self.storage) + self._batched.clear() + + # TODO: make this inplace? + # self.__dict__.update(tree.__dict__) + # return self + return tree def add_node(self, node): pos = self.new_node_pos(node) @@ -320,7 +342,6 @@ def find(self, search_fn, *args, **kwargs): trace("(TRAVERSAL) {0}", node_p) # apply search fn. If return false, truncate search. if search_fn(node_g, *args): - # leaf node? it's a match! if isinstance(node_g, Leaf): matches.append(node_g) @@ -1269,6 +1290,340 @@ def load(cls, info, storage=None): storage=storage) +def scaffold(original_datasets, storage, bf_size=None): + """ Generate an SBT with nodes clustered by amount of shared hashes + + Parameters + ---------- + + datasets: List[Union[SourmashSignature, Leaf]] + List of signatures, or already existing leaves from another SBT, to index + storage: Storage + Where to save the nodes data during construction + bf_size: int, optional + Size for Nodegraphs. If None, will use a HyperLogLog counter to estimate + unique k-mers and calculate an optimal size + + Returns + ------- + SBT + A newly initialized SBT with nodes clustered by amount of shared hashes + """ + + from .sbtmh import SigLeaf + from .signature import SourmashSignature + + InternalNode = namedtuple("InternalNode", "element left right") + BinaryLeaf = namedtuple("BinaryLeaf", "element") + THRESHOLD = 0 + + hll = None + ksize = None + + datasets = [] + for d in original_datasets: + try: + d_mh = d.data.minhash + except AttributeError: + d_mh = d.minhash + + # replace MH dataset by HLL + d_hll = HLL(0.01, d_mh.ksize) + d_hll.update(d_mh) + + # save MH to storage + if isinstance(d, SourmashSignature): + d = SigLeaf(d.md5sum(), d) + + if isinstance(d, SigLeaf): + d.data + d.storage = storage + + data = { + 'd': d, + 'hll': d_hll, + } + filename = os.path.basename(d.name) + if isinstance(storage, ZipStorage): + if storage.subdir is None: + name = storage.path.split('/')[-1][:-8] + storage.subdir = '.sbt.{}'.format(name) + # TODO: set _path internally when calling save()? + d._path = d.save(os.path.join(storage.subdir, filename)) + elif storage is not None: + d._path = d.save(filename) + + # unload MH + if storage is not None: + d.unload() + + datasets.append(data) + + else: + raise ValueError("unknown dataset type") + + del original_datasets + + heap = [] + for i, data1 in enumerate(datasets): + if i % 100 == 0: + print(f"processed {i} out of {len(datasets)}", end='\r') + + d1 = data1['hll'] + + if hll is None and bf_size is None: + ksize = d1.ksize + hll = HLL(0.01, ksize) + + if hll is not None: + hll.update(d1) + + for j, data2 in enumerate(datasets): + if i > j: + d2 = data2['hll'] + common = d1.intersection(d2) + + # TODO: check for a low threshold to insert + # (need to change logic further down before setting threshold) + if common >= THRESHOLD: + # heapq defaults to a min heap, + # invert "common" here to avoid having to use the + # internal methods for a max heap + heapq.heappush(heap, (-common, i, j)) + + if hll is not None: + n_unique_hashes = len(hll) + num_htables, htable_size, mem_use, fp_rate = optimal_size(n_unique_hashes, fp_rate=0.9) + # TODO: check this, we prefer ntables = 1? + #htable_size *= num_htables + else: + htable_size = bf_size + num_htables = 1 + + factory = GraphFactory(31, htable_size, num_htables) + + processed = set() + total_datasets = len(datasets) + next_round = [] + while heap: + i = len(heap) + if i % 100 == 0: + print(f"processed {i} out of {total_datasets}", end='\r') + + (_, p1, p2) = heapq.heappop(heap) + if p1 not in processed and p2 not in processed: + data1 = datasets[p1] + data2 = datasets[p2] + data1['processed'] = True + data2['processed'] = True + processed.add(p1) + processed.add(p2) + + d1 = data1['hll'] + d2 = data2['hll'] + + element = HLL(0.01, d1.ksize) + element.update(d1) + element.update(d2) + + new_internal = InternalNode(element, BinaryLeaf(data1['d']), BinaryLeaf(data2['d'])) + next_round.append(new_internal) + elif p1 in processed and p2 in processed: + # already processed both, nothing to do + continue + else: + # at least one (p1, p2) is still valid. + # only process if it is the last one + if total_datasets - len(processed) == 1: + d = datasets[p1] + if d is None: + d = datasets[p2] + assert not d['processed'] + d['processed'] = True + processed.add(p2) + else: + d['processed'] = True + processed.add(p1) + + common = d['hll'] + + new_internal = InternalNode(common, BinaryLeaf(d['d']), None) + next_round.append(new_internal) + + # Finish processing leaves, start dealing with internal nodes in next_round + while len(next_round) > 1: + current_round = next_round + next_round = [] + total_nodes = len(current_round) + + heap = [] + for (i, d1) in enumerate(current_round): + for (j, d2) in enumerate(current_round): + if i > j: + common = d1.element.intersection(d2.element) + heapq.heappush(heap, (-common, i, j)) + + processed = set() + while heap: + i = len(heap) + if i % 100 == 0: + print(f"processed {i} out of {total_nodes}", end='\r') + + (_, p1, p2) = heapq.heappop(heap) + if p1 not in processed and p2 not in processed: + d1 = current_round[p1] + d2 = current_round[p2] + current_round[p1] = None + current_round[p2] = None + processed.add(p1) + processed.add(p2) + + element = HLL(0.01, ksize) + element.update(d1.element) + element.update(d2.element) + + new_internal = InternalNode(element, d1, d2) + next_round.append(new_internal) + + # TODO: make Node (data = Nodegraphs) for d1 and d2 + # use the factory to create Nodegraph + # TODO: save d1 and d2 Nodes into storage and unload them + # PROBLEM: d1,d2 don't have stable names, + # and the SBT format uses internal.N where N is + # the position in the tree... + # SOLUTION: use md5 as name, and save internal nodes + # inside internal/MD5 ? + # (need new SBT version bump?) + + elif p1 in processed and p2 in processed: + # already processed both, nothing to do + continue + else: + # at least one (p1, p2) is still valid. + # process it if it is the last element left + if total_nodes - len(processed) == 1: + d = current_round[p1] + if d is None: + d = current_round[p2] + current_round[p2] = None + assert d is not None + processed.add(p2) + else: + processed.add(p1) + current_round[p1] = None + + new_internal = InternalNode(d.element, d, None) + next_round.append(new_internal) + + # TODO: make Node (data = Nodegraphs) for d + # use the factory to create Nodegraph + # TODO: save d into storage and unload it + # PROBLEM: d doesn't have a stable name, + # and the SBT format uses internal.N where N is + # the position in the tree... + # SOLUTION: use md5 as name, and save internal nodes + # inside internal/MD5 ? + # (need new SBT version bump) + + # next_round only contains the root of the SBT + # Convert from binary tree to nodes/leaves + if total_datasets == 1: + d = datasets.pop() + common = d['hll'] + root = InternalNode(common, BinaryLeaf(d['d']), None) + else: + root = next_round.pop() + # This approach puts the more complete tree on the right, + # we prefer to have it on the left, + # so let's rotate it for now. + root = InternalNode(root.element, root.right, root.left) + + assert not next_round + + nodes = {} + leaves = {} + + def check_lift(cnode): + # Base case + if isinstance(cnode, BinaryLeaf): + return cnode + + if cnode.right is None and cnode.left is None: + return cnode + elif cnode.right is None and cnode.left is not None: + return check_lift(cnode.left) + elif cnode.right is not None and cnode.left is None: + return check_lift(cnode.right) + + # search the tree in order and build an SBT + visited = set() + queue = [(0, root)] + while queue: + (pos, cnode) = queue.pop(0) + if pos not in visited: + visited.add(pos) + #if pos == 6: + # import pdb; pdb.set_trace() + + if isinstance(cnode, BinaryLeaf): + leaves[pos] = cnode.element + elif isinstance(cnode, InternalNode): + #to_lift = check_lift(cnode) + to_lift = None + if cnode.right is None and cnode.left is not None: + to_lift = cnode.left + elif cnode.right is not None and cnode.left is None: + to_lift = cnode.right + + if to_lift: + # This is a lift: if the internal node only have one child, + # lift the child to be the node + cnode = to_lift + if isinstance(cnode, BinaryLeaf): + # if InternalNode only has one child and it is a Leaf, + # use it instead (avoid creating a Node with only one child) + leaves[pos] = cnode.element + continue + + new_node = Node(factory, name=f"internal.{pos}") + data = new_node.data + + # TODO: we can't iterate elements with HLL... + #for e in cnode.element: + # data.count(e) + + nodes[pos] = new_node + + # TODO: this is a one-level rotation of the internal nodes, + # since we want all "complete" subtrees to be to the left, + # and all spaces as much to the right as possible. + # Ideally do a pre-processing step before searching the tree in + # order and building the SBT. + left = cnode.left + right = cnode.right + if isinstance(left, InternalNode) and isinstance(right, InternalNode): + # make sure the left one is "more complete" than the right one + if any(c is None for c in (left.left, left.right)): + left, right = right, left + + if left: + queue.append((2 * pos + 1, left)) + if right: + queue.append((2 * pos + 2, right)) + + new_tree = SBT(factory, storage=storage) + new_tree._nodes = nodes + new_tree._leaves = leaves + + # TODO: none of these should be necessarily, if we can save both internal + # and leaf nodes consistently + new_tree._fill_min_n_below() + # TODO: need to modify _fill_internal to allow unloading after saving? + new_tree._fill_internal() + + return new_tree + + def filter_distance(filter_a, filter_b, n=1000): """ Compute a heuristic distance per bit between two Bloom filters. diff --git a/src/core/src/sketch/hyperloglog/estimators.rs b/src/core/src/sketch/hyperloglog/estimators.rs index 4c2fbe02cc..f3a6cb8197 100644 --- a/src/core/src/sketch/hyperloglog/estimators.rs +++ b/src/core/src/sketch/hyperloglog/estimators.rs @@ -32,7 +32,7 @@ pub fn mle(counts: &[u16], p: usize, q: usize, relerr: f64) -> f64 { let mut z = 0.; for i in num_iter::range_step_inclusive(k_max_prime as i32, k_min_prime as i32, -1) { - z = 0.5 * z + counts[i as usize] as f64; + z = 0.5 * z + (counts[i as usize] as f64); } // ldexp(x, i) = x * (2 ** i) diff --git a/tests/test_sbt.py b/tests/test_sbt.py index 4d7bb51db6..944d4f990b 100644 --- a/tests/test_sbt.py +++ b/tests/test_sbt.py @@ -7,7 +7,7 @@ import sourmash from sourmash import load_one_signature, SourmashSignature, load_signatures from sourmash.exceptions import IndexNotSupported -from sourmash.sbt import SBT, GraphFactory, Leaf, Node +from sourmash.sbt import SBT, GraphFactory, Leaf, Node, scaffold from sourmash.sbtmh import (SigLeaf, search_minhashes, search_minhashes_containment) from sourmash.sbt_storage import (FSStorage, RedisStorage, @@ -915,3 +915,44 @@ def test_sbt_node_cache(): assert tree._nodescache.currsize == 1 assert tree._nodescache.currsize == 1 + + +def test_sbt_scaffold(tmpdir): + factory = GraphFactory(31, 1e5, 4) + + tree = SBT(factory) + leaves = [] + + for f in utils.SIG_FILES: + sig = next(load_signatures(utils.get_test_data(f))) + leaf = SigLeaf(os.path.basename(f), sig) + tree.add_node(leaf) + leaves.append(leaf) + to_search = leaf + + print('*' * 60) + print("{}:".format(to_search.metadata)) + old_result = {str(s) for s in tree.find(search_minhashes, + to_search.data, 0.1)} + print(*old_result, sep='\n') + + with ZipStorage(str(tmpdir.join("tree.sbt.zip"))) as storage: + tree = scaffold(list(leaves), storage) + new_leaves = set(tree.leaves()) + assert len(new_leaves) == len(leaves) + assert new_leaves == set(leaves) + + tree.save(str(tmpdir.join("tree")), storage=storage) + + with ZipStorage(str(tmpdir.join("tree.sbt.zip"))) as storage: + tree = SBT.load(str(tmpdir.join("tree")), + leaf_loader=SigLeaf.load, + storage=storage) + + print('*' * 60) + print("{}:".format(to_search.metadata)) + new_result = {str(s) for s in tree.find(search_minhashes, + to_search.data, 0.1)} + print(*new_result, sep='\n') + + assert old_result == new_result From 33c958f1225dbd43a9412f1a1185084f12ac6805 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Wed, 28 Oct 2020 21:01:43 -0700 Subject: [PATCH 3/8] use Nodegraphs for intersection --- include/sourmash.h | 2 + sourmash/nodegraph.py | 8 +++ sourmash/sbt.py | 97 ++++++++++++++++---------------- src/core/src/ffi/nodegraph.rs | 12 ++++ src/core/src/sketch/nodegraph.rs | 22 ++++---- 5 files changed, 81 insertions(+), 60 deletions(-) diff --git a/include/sourmash.h b/include/sourmash.h index b5e111b662..56224dc90b 100644 --- a/include/sourmash.h +++ b/include/sourmash.h @@ -263,6 +263,8 @@ uintptr_t nodegraph_get_kmer(const SourmashNodegraph *ptr, const char *kmer); const uint64_t *nodegraph_hashsizes(const SourmashNodegraph *ptr, uintptr_t *size); +uintptr_t nodegraph_intersection_count(const SourmashNodegraph *ptr, const SourmashNodegraph *optr); + uintptr_t nodegraph_ksize(const SourmashNodegraph *ptr); uintptr_t nodegraph_matches(const SourmashNodegraph *ptr, const SourmashKmerMinHash *mh_ptr); diff --git a/sourmash/nodegraph.py b/sourmash/nodegraph.py index 884baa1e86..2b6b1d897f 100644 --- a/sourmash/nodegraph.py +++ b/sourmash/nodegraph.py @@ -88,6 +88,14 @@ def matches(self, mh): return self._methodcall(lib.nodegraph_matches, mh._objptr) + def intersection_count(self, other): + if isinstance(other, Nodegraph): + return self._methodcall(lib.nodegraph_intersection_count, other._objptr) + else: + # FIXME: we could take MinHash and sets here too (or anything that can be + # converted to a list of ints...) + raise TypeError("Must be a Nodegraph") + def to_khmer_nodegraph(self): import khmer try: diff --git a/sourmash/sbt.py b/sourmash/sbt.py index b419913d63..4ffc7bb0bb 100644 --- a/sourmash/sbt.py +++ b/sourmash/sbt.py @@ -1322,27 +1322,14 @@ def scaffold(original_datasets, storage, bf_size=None): datasets = [] for d in original_datasets: - try: - d_mh = d.data.minhash - except AttributeError: - d_mh = d.minhash - - # replace MH dataset by HLL - d_hll = HLL(0.01, d_mh.ksize) - d_hll.update(d_mh) - - # save MH to storage if isinstance(d, SourmashSignature): d = SigLeaf(d.md5sum(), d) + # save MH to storage if isinstance(d, SigLeaf): d.data d.storage = storage - data = { - 'd': d, - 'hll': d_hll, - } filename = os.path.basename(d.name) if isinstance(storage, ZipStorage): if storage.subdir is None: @@ -1353,15 +1340,9 @@ def scaffold(original_datasets, storage, bf_size=None): elif storage is not None: d._path = d.save(filename) - # unload MH - if storage is not None: - d.unload() - - datasets.append(data) - + datasets.append(d) else: raise ValueError("unknown dataset type") - del original_datasets heap = [] @@ -1369,7 +1350,7 @@ def scaffold(original_datasets, storage, bf_size=None): if i % 100 == 0: print(f"processed {i} out of {len(datasets)}", end='\r') - d1 = data1['hll'] + d1 = data1.data.minhash if hll is None and bf_size is None: ksize = d1.ksize @@ -1380,8 +1361,8 @@ def scaffold(original_datasets, storage, bf_size=None): for j, data2 in enumerate(datasets): if i > j: - d2 = data2['hll'] - common = d1.intersection(d2) + d2 = data2.data.minhash + common = d1.count_common(d2) # TODO: check for a low threshold to insert # (need to change logic further down before setting threshold) @@ -1396,12 +1377,15 @@ def scaffold(original_datasets, storage, bf_size=None): num_htables, htable_size, mem_use, fp_rate = optimal_size(n_unique_hashes, fp_rate=0.9) # TODO: check this, we prefer ntables = 1? #htable_size *= num_htables - else: - htable_size = bf_size + print(len(hll), num_htables, htable_size) + #else: + if 1: + htable_size = 1e5 num_htables = 1 - factory = GraphFactory(31, htable_size, num_htables) + factory = GraphFactory(ksize, htable_size, num_htables) + print("Processing first round of internal") processed = set() total_datasets = len(datasets) next_round = [] @@ -1414,20 +1398,26 @@ def scaffold(original_datasets, storage, bf_size=None): if p1 not in processed and p2 not in processed: data1 = datasets[p1] data2 = datasets[p2] - data1['processed'] = True - data2['processed'] = True + datasets[p1] = None + datasets[p2] = None processed.add(p1) processed.add(p2) - d1 = data1['hll'] - d2 = data2['hll'] + d1 = data1.data.minhash + d2 = data2.data.minhash - element = HLL(0.01, d1.ksize) + #element = HLL(0.01, d1.ksize) + element = factory() element.update(d1) element.update(d2) - new_internal = InternalNode(element, BinaryLeaf(data1['d']), BinaryLeaf(data2['d'])) + new_internal = InternalNode(element, BinaryLeaf(data1), BinaryLeaf(data2)) next_round.append(new_internal) + + # unload d1 and d2 + data1.unload() + data2.unload() + elif p1 in processed and p2 in processed: # already processed both, nothing to do continue @@ -1438,20 +1428,25 @@ def scaffold(original_datasets, storage, bf_size=None): d = datasets[p1] if d is None: d = datasets[p2] - assert not d['processed'] - d['processed'] = True + assert d is not None + datasets[p2] = None processed.add(p2) else: - d['processed'] = True + datasets[p1] = None processed.add(p1) - common = d['hll'] + common = factory() + element.update(d.data.minhash) - new_internal = InternalNode(common, BinaryLeaf(d['d']), None) + new_internal = InternalNode(common, BinaryLeaf(d), None) next_round.append(new_internal) + # unload d + d.unload() + # Finish processing leaves, start dealing with internal nodes in next_round while len(next_round) > 1: + print("Processing next round of internal") current_round = next_round next_round = [] total_nodes = len(current_round) @@ -1460,7 +1455,7 @@ def scaffold(original_datasets, storage, bf_size=None): for (i, d1) in enumerate(current_round): for (j, d2) in enumerate(current_round): if i > j: - common = d1.element.intersection(d2.element) + common = d1.element.intersection_count(d2.element) heapq.heappush(heap, (-common, i, j)) processed = set() @@ -1478,7 +1473,7 @@ def scaffold(original_datasets, storage, bf_size=None): processed.add(p1) processed.add(p2) - element = HLL(0.01, ksize) + element = factory() element.update(d1.element) element.update(d2.element) @@ -1529,8 +1524,9 @@ def scaffold(original_datasets, storage, bf_size=None): # Convert from binary tree to nodes/leaves if total_datasets == 1: d = datasets.pop() - common = d['hll'] - root = InternalNode(common, BinaryLeaf(d['d']), None) + common = factory() + common.update(d.data.minhash) + root = InternalNode(common, BinaryLeaf(d), None) else: root = next_round.pop() # This approach puts the more complete tree on the right, @@ -1582,15 +1578,20 @@ def check_lift(cnode): if isinstance(cnode, BinaryLeaf): # if InternalNode only has one child and it is a Leaf, # use it instead (avoid creating a Node with only one child) + + # Technically the parent here should have all the data, + # but... it didn't. I might be missing something + # somewhere else... + ppos = int(math.floor((pos - 1) / 2)) + if ppos != -1: + # ppos == -1 means this is the root node + nodes[ppos].data.update(cnode.element.data.minhash) + leaves[pos] = cnode.element continue new_node = Node(factory, name=f"internal.{pos}") - data = new_node.data - - # TODO: we can't iterate elements with HLL... - #for e in cnode.element: - # data.count(e) + new_node.data = cnode.element nodes[pos] = new_node @@ -1619,7 +1620,7 @@ def check_lift(cnode): # and leaf nodes consistently new_tree._fill_min_n_below() # TODO: need to modify _fill_internal to allow unloading after saving? - new_tree._fill_internal() + #new_tree._fill_internal() return new_tree diff --git a/src/core/src/ffi/nodegraph.rs b/src/core/src/ffi/nodegraph.rs index 29a1c3e84c..cc2191a0d4 100644 --- a/src/core/src/ffi/nodegraph.rs +++ b/src/core/src/ffi/nodegraph.rs @@ -149,6 +149,18 @@ pub unsafe extern "C" fn nodegraph_update( ong.update(ng).unwrap(); } +#[no_mangle] +pub unsafe extern "C" fn nodegraph_intersection_count( + ptr: *const SourmashNodegraph, + optr: *const SourmashNodegraph, +) -> usize { + let ng = SourmashNodegraph::as_rust(ptr); + let ong = SourmashNodegraph::as_rust(optr); + + // FIXME raise an exception properly + ng.intersection_count(ong) +} + #[no_mangle] pub unsafe extern "C" fn nodegraph_update_mh( ptr: *mut SourmashNodegraph, diff --git a/src/core/src/sketch/nodegraph.rs b/src/core/src/sketch/nodegraph.rs index 1e2fac1eb2..50e07855bd 100644 --- a/src/core/src/sketch/nodegraph.rs +++ b/src/core/src/sketch/nodegraph.rs @@ -289,31 +289,29 @@ impl Nodegraph { self.unique_kmers } - pub fn similarity(&self, other: &Nodegraph) -> f64 { - let result: usize = self - .bs + pub fn intersection_count(&self, other: &Nodegraph) -> usize { + self.bs .iter() .zip(&other.bs) .map(|(bs, bs_other)| bs.intersection(bs_other).count()) - .sum(); + .sum() + } + + pub fn similarity(&self, other: &Nodegraph) -> f64 { + let intersection = self.intersection_count(other); let size: usize = self .bs .iter() .zip(&other.bs) .map(|(bs, bs_other)| bs.union(bs_other).count()) .sum(); - result as f64 / size as f64 + intersection as f64 / size as f64 } pub fn containment(&self, other: &Nodegraph) -> f64 { - let result: usize = self - .bs - .iter() - .zip(&other.bs) - .map(|(bs, bs_other)| bs.intersection(bs_other).count()) - .sum(); + let intersection = self.intersection_count(other); let size: usize = self.bs.iter().map(|bs| bs.len()).sum(); - result as f64 / size as f64 + intersection as f64 / size as f64 } } From ed8ffa84984d19b748ff9335a11c606458995a1d Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Fri, 30 Oct 2020 10:21:48 -0700 Subject: [PATCH 4/8] build Nodegraph earlier --- sourmash/sbt.py | 54 ++++++++++++++++++++----------------------------- 1 file changed, 22 insertions(+), 32 deletions(-) diff --git a/sourmash/sbt.py b/sourmash/sbt.py index 4ffc7bb0bb..ef5c40d9b1 100644 --- a/sourmash/sbt.py +++ b/sourmash/sbt.py @@ -1403,15 +1403,12 @@ def scaffold(original_datasets, storage, bf_size=None): processed.add(p1) processed.add(p2) - d1 = data1.data.minhash - d2 = data2.data.minhash + # Name will be updated later, when we have the right position + new_node = Node(factory, name=f"internal.XXX") + data1.update(new_node) + data2.update(new_node) - #element = HLL(0.01, d1.ksize) - element = factory() - element.update(d1) - element.update(d2) - - new_internal = InternalNode(element, BinaryLeaf(data1), BinaryLeaf(data2)) + new_internal = InternalNode(new_node, BinaryLeaf(data1), BinaryLeaf(data2)) next_round.append(new_internal) # unload d1 and d2 @@ -1435,10 +1432,11 @@ def scaffold(original_datasets, storage, bf_size=None): datasets[p1] = None processed.add(p1) - common = factory() - element.update(d.data.minhash) + # Name will be updated later, when we have the right position + new_node = Node(factory, name=f"internal.XXX") + d.update(new_node) - new_internal = InternalNode(common, BinaryLeaf(d), None) + new_internal = InternalNode(new_node, BinaryLeaf(d), None) next_round.append(new_internal) # unload d @@ -1455,7 +1453,7 @@ def scaffold(original_datasets, storage, bf_size=None): for (i, d1) in enumerate(current_round): for (j, d2) in enumerate(current_round): if i > j: - common = d1.element.intersection_count(d2.element) + common = d1.element.data.intersection_count(d2.element.data) heapq.heappush(heap, (-common, i, j)) processed = set() @@ -1473,15 +1471,14 @@ def scaffold(original_datasets, storage, bf_size=None): processed.add(p1) processed.add(p2) - element = factory() - element.update(d1.element) - element.update(d2.element) + # Name will be updated later, when we have the right position + new_node = Node(factory, name=f"internal.XXX") + d1.element.update(new_node) + d2.element.update(new_node) - new_internal = InternalNode(element, d1, d2) + new_internal = InternalNode(new_node, d1, d2) next_round.append(new_internal) - # TODO: make Node (data = Nodegraphs) for d1 and d2 - # use the factory to create Nodegraph # TODO: save d1 and d2 Nodes into storage and unload them # PROBLEM: d1,d2 don't have stable names, # and the SBT format uses internal.N where N is @@ -1507,11 +1504,12 @@ def scaffold(original_datasets, storage, bf_size=None): processed.add(p1) current_round[p1] = None - new_internal = InternalNode(d.element, d, None) + new_node = Node(factory, name=f"internal.XXX") + d.element.update(new_node) + + new_internal = InternalNode(new_node, d, None) next_round.append(new_internal) - # TODO: make Node (data = Nodegraphs) for d - # use the factory to create Nodegraph # TODO: save d into storage and unload it # PROBLEM: d doesn't have a stable name, # and the SBT format uses internal.N where N is @@ -1585,15 +1583,13 @@ def check_lift(cnode): ppos = int(math.floor((pos - 1) / 2)) if ppos != -1: # ppos == -1 means this is the root node - nodes[ppos].data.update(cnode.element.data.minhash) + cnode.element.update(nodes[ppos]) leaves[pos] = cnode.element continue - new_node = Node(factory, name=f"internal.{pos}") - new_node.data = cnode.element - - nodes[pos] = new_node + cnode.element.name = f"internal.{pos}" + nodes[pos] = cnode.element # TODO: this is a one-level rotation of the internal nodes, # since we want all "complete" subtrees to be to the left, @@ -1616,12 +1612,6 @@ def check_lift(cnode): new_tree._nodes = nodes new_tree._leaves = leaves - # TODO: none of these should be necessarily, if we can save both internal - # and leaf nodes consistently - new_tree._fill_min_n_below() - # TODO: need to modify _fill_internal to allow unloading after saving? - #new_tree._fill_internal() - return new_tree From a6f72e6d03608eb6366c85ece6f5968d42993f12 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Fri, 30 Oct 2020 13:56:03 -0700 Subject: [PATCH 5/8] take factory instead of bf_size for scaffold --- sourmash/cli/index.py | 4 +++- sourmash/commands.py | 8 +++++--- sourmash/sbt.py | 27 +++++++++++++++++---------- sourmash/sbtmh.py | 5 ++++- 4 files changed, 29 insertions(+), 15 deletions(-) diff --git a/sourmash/cli/index.py b/sourmash/cli/index.py index 57fcb8ef40..b71e295a1a 100644 --- a/sourmash/cli/index.py +++ b/sourmash/cli/index.py @@ -55,7 +55,9 @@ def subparser(subparsers): ) subparser.add_argument( '-x', '--bf-size', metavar='S', type=float, default=1e5, - help='Bloom filter size used for internal nodes' + help='Maximum Bloom filter size used for internal nodes. Set to 0 ' + 'to calculate an optimal size given the complexity of the datasets ' + '(using a HyperLogLog to estimate the number of unique k-mers' ) subparser.add_argument( '-f', '--force', action='store_true', diff --git a/sourmash/commands.py b/sourmash/commands.py index 4fbc3c7e19..86ec7af630 100644 --- a/sourmash/commands.py +++ b/sourmash/commands.py @@ -12,7 +12,6 @@ from . import signature as sig from . import sourmash_args from .logging import notify, error, print_results, set_quiet -from .sbt import scaffold from .sbtmh import SearchMinHashesFindBest, SigLeaf from .sourmash_args import DEFAULT_LOAD_K, FileOutput @@ -332,10 +331,13 @@ def index(args): if args.append: tree = load_sbt_index(args.sbt_name) else: - tree = create_sbt_index(args.bf_size, n_children=args.n_children) + bf_size = args.bf_size + if bf_size == 0: + bf_size = None + + tree = create_sbt_index(bf_size, n_children=args.n_children) batch = True # TODO: set up storage here - # TODO: deal with factory and args.bf_size too? if args.sparseness < 0 or args.sparseness > 1.0: error('sparseness must be in range [0.0, 1.0].') diff --git a/sourmash/sbt.py b/sourmash/sbt.py index ef5c40d9b1..3e0e262fe4 100644 --- a/sourmash/sbt.py +++ b/sourmash/sbt.py @@ -250,7 +250,7 @@ def finish(self): # TODO: check if SBT is empty, if it is not add current leaves to be # scaffolded too - tree = scaffold(self._batched, self.storage) + tree = scaffold(self._batched, self.storage, self.factory) self._batched.clear() # TODO: make this inplace? @@ -1290,7 +1290,7 @@ def load(cls, info, storage=None): storage=storage) -def scaffold(original_datasets, storage, bf_size=None): +def scaffold(original_datasets, storage, factory=None): """ Generate an SBT with nodes clustered by amount of shared hashes Parameters @@ -1300,9 +1300,9 @@ def scaffold(original_datasets, storage, bf_size=None): List of signatures, or already existing leaves from another SBT, to index storage: Storage Where to save the nodes data during construction - bf_size: int, optional - Size for Nodegraphs. If None, will use a HyperLogLog counter to estimate - unique k-mers and calculate an optimal size + factory: Factory, optional + A factory for internal nodes. If None, will use a HyperLogLog counter + to estimate unique k-mers and calculate an optimal size for Nodegraphs. Returns ------- @@ -1345,6 +1345,10 @@ def scaffold(original_datasets, storage, bf_size=None): raise ValueError("unknown dataset type") del original_datasets + # TODO: we can build the heap in parallel. + # on top of doing the count_common calculations in parallel, + # we can also avoid building the heap (just build a list first) + # and then call heapify on it after the list is ready heap = [] for i, data1 in enumerate(datasets): if i % 100 == 0: @@ -1352,7 +1356,7 @@ def scaffold(original_datasets, storage, bf_size=None): d1 = data1.data.minhash - if hll is None and bf_size is None: + if hll is None and factory is None: ksize = d1.ksize hll = HLL(0.01, ksize) @@ -1372,18 +1376,17 @@ def scaffold(original_datasets, storage, bf_size=None): # internal methods for a max heap heapq.heappush(heap, (-common, i, j)) - if hll is not None: + if factory is None: n_unique_hashes = len(hll) num_htables, htable_size, mem_use, fp_rate = optimal_size(n_unique_hashes, fp_rate=0.9) # TODO: check this, we prefer ntables = 1? #htable_size *= num_htables print(len(hll), num_htables, htable_size) - #else: - if 1: + htable_size = 1e5 num_htables = 1 - factory = GraphFactory(ksize, htable_size, num_htables) + factory = GraphFactory(ksize, htable_size, num_htables) print("Processing first round of internal") processed = set() @@ -1449,6 +1452,10 @@ def scaffold(original_datasets, storage, bf_size=None): next_round = [] total_nodes = len(current_round) + # TODO: we can build the heap in parallel. + # on top of doing the intersection_count calculations in parallel, + # we can also avoid building the heap (just build a list first) + # and then call heapify on it after the list is ready heap = [] for (i, d1) in enumerate(current_round): for (j, d2) in enumerate(current_round): diff --git a/sourmash/sbtmh.py b/sourmash/sbtmh.py index cce5bf2b2e..282ef1ca3a 100644 --- a/sourmash/sbtmh.py +++ b/sourmash/sbtmh.py @@ -15,7 +15,10 @@ def load_sbt_index(filename, *, print_version_warning=True, cache_size=None): def create_sbt_index(bloom_filter_size=1e5, n_children=2): "Create an empty SBT index." - factory = GraphFactory(1, bloom_filter_size, 4) + if bloom_filter_size == 0: + factory = None + else: + factory = GraphFactory(1, bloom_filter_size, 4) tree = SBT(factory, d=n_children) return tree From 3a65fb8d9e6ef302450fd0a76dd56f94aa66961c Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Fri, 30 Oct 2020 20:27:44 -0700 Subject: [PATCH 6/8] wip split internal and leaf nodes in storages --- sourmash/sbt.py | 107 ++++++++++++++++++++++------------------ sourmash/sbt_storage.py | 4 ++ sourmash/sbtmh.py | 14 ++++-- 3 files changed, 74 insertions(+), 51 deletions(-) diff --git a/sourmash/sbt.py b/sourmash/sbt.py index 3e0e262fe4..0fc5c8333d 100644 --- a/sourmash/sbt.py +++ b/sourmash/sbt.py @@ -44,6 +44,7 @@ def search_transcript(node, seq, threshold): from collections import namedtuple, Counter from collections.abc import Mapping +import hashlib import heapq from copy import copy @@ -581,7 +582,7 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): """ info = {} info['d'] = self.d - info['version'] = 6 + info['version'] = 7 info["index_type"] = self.__class__.__name__ # TODO: check # choose between ZipStorage and FS (file system/directory) storage. @@ -634,17 +635,11 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): if random() - sparseness <= 0: continue - data = { - # TODO: start using md5sum instead? - 'filename': os.path.basename(node.name), - 'name': node.name - } - + data = {"name": node.name} try: node.metadata.pop('max_n_below') except (AttributeError, KeyError): pass - data['metadata'] = node.metadata if structure_only is False: @@ -653,10 +648,14 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): node.storage = storage + basepath = None if kind == "Zip": - node.save(os.path.join(subdir, data['filename'])) - elif kind == "FS": - data['filename'] = node.save(data['filename']) + basepath = subdir + + data["filename"] = _save_node(node, basepath) + else: + # TODO: a dry-run mode for calculating the name? + data["filename"] = node._path if isinstance(node, Node): nodes[i] = data @@ -761,6 +760,7 @@ def load(cls, location, *, leaf_loader=None, storage=None, print_version_warning 4: cls._load_v4, 5: cls._load_v5, 6: cls._load_v6, + 7: cls._load_v6, } try: @@ -1099,7 +1099,7 @@ def print_dot(self): for i, node in self._nodes.items(): if isinstance(node, Node): print('"{}" [shape=box fillcolor=gray style=filled]'.format( - node.name)) + ode.name)) for j, child in self.children(i): if child is not None: print('"{}" -> "{}"'.format(node.name, child.name)) @@ -1196,9 +1196,17 @@ def __str__(self): name=self.name, nb=self.data.n_occupied(), fpr=calc_expected_collisions(self.data, True, 1.1)) - def save(self, path): + def save(self, subdir=None): buf = self.data.to_bytes(compression=1) - return self.storage.save(path, buf) + hash_md5 = hashlib.md5() + hash_md5.update(buf) + path = "internal/" + hash_md5.hexdigest() + + if subdir is not None: + path = os.path.join(subdir, path) + + self._path = self.storage.save(path, buf) + return self._path @property def data(self): @@ -1275,9 +1283,17 @@ def unload(self): # TODO: Check that data is actually in the storage? self._data = None - def save(self, path): + def save(self, subdir=None): buf = self.data.to_bytes(compression=1) - return self.storage.save(path, buf) + hash_md5 = hashlib.md5() + hash_md5.update(buf) + path = hash_md5.hexdigest() + + if subdir is not None: + path = os.path.join(subdir, path) + + self._path = self.storage.save("signatures/" + path, buf) + return self._path def update(self, parent): parent.data.update(self.data) @@ -1320,6 +1336,13 @@ def scaffold(original_datasets, storage, factory=None): hll = None ksize = None + subdir = None + if isinstance(storage, ZipStorage): + if storage.subdir is None: + name = storage.path.split('/')[-1][:-8] + storage.subdir = '.sbt.{}'.format(name) + subdir = storage.subdir + datasets = [] for d in original_datasets: if isinstance(d, SourmashSignature): @@ -1328,18 +1351,9 @@ def scaffold(original_datasets, storage, factory=None): # save MH to storage if isinstance(d, SigLeaf): d.data - d.storage = storage - - filename = os.path.basename(d.name) - if isinstance(storage, ZipStorage): - if storage.subdir is None: - name = storage.path.split('/')[-1][:-8] - storage.subdir = '.sbt.{}'.format(name) - # TODO: set _path internally when calling save()? - d._path = d.save(os.path.join(storage.subdir, filename)) - elif storage is not None: - d._path = d.save(filename) - + if storage is not None: + d.storage = storage + _save_node(d, subdir) datasets.append(d) else: raise ValueError("unknown dataset type") @@ -1478,21 +1492,19 @@ def scaffold(original_datasets, storage, factory=None): processed.add(p1) processed.add(p2) - # Name will be updated later, when we have the right position - new_node = Node(factory, name=f"internal.XXX") + new_node = Node(factory) d1.element.update(new_node) d2.element.update(new_node) new_internal = InternalNode(new_node, d1, d2) next_round.append(new_internal) - # TODO: save d1 and d2 Nodes into storage and unload them - # PROBLEM: d1,d2 don't have stable names, - # and the SBT format uses internal.N where N is - # the position in the tree... - # SOLUTION: use md5 as name, and save internal nodes - # inside internal/MD5 ? - # (need new SBT version bump?) + # save d1 and d2 Nodes into storage and unload them, if a storage is available + if storage is not None: + _save_node(d1.element) + d1.element.unload() + _save_node(d2.element) + d2.element.unload() elif p1 in processed and p2 in processed: # already processed both, nothing to do @@ -1511,19 +1523,16 @@ def scaffold(original_datasets, storage, factory=None): processed.add(p1) current_round[p1] = None - new_node = Node(factory, name=f"internal.XXX") + new_node = Node(factory) d.element.update(new_node) new_internal = InternalNode(new_node, d, None) next_round.append(new_internal) - # TODO: save d into storage and unload it - # PROBLEM: d doesn't have a stable name, - # and the SBT format uses internal.N where N is - # the position in the tree... - # SOLUTION: use md5 as name, and save internal nodes - # inside internal/MD5 ? - # (need new SBT version bump) + # save d into storage and unload it, if a storage is available + if storage is not None: + _save_node(d.element) + d.element.unload() # next_round only contains the root of the SBT # Convert from binary tree to nodes/leaves @@ -1563,8 +1572,6 @@ def check_lift(cnode): (pos, cnode) = queue.pop(0) if pos not in visited: visited.add(pos) - #if pos == 6: - # import pdb; pdb.set_trace() if isinstance(cnode, BinaryLeaf): leaves[pos] = cnode.element @@ -1595,7 +1602,6 @@ def check_lift(cnode): leaves[pos] = cnode.element continue - cnode.element.name = f"internal.{pos}" nodes[pos] = cnode.element # TODO: this is a one-level rotation of the internal nodes, @@ -1622,6 +1628,11 @@ def check_lift(cnode): return new_tree +def _save_node(node, basepath=None): + path = basepath + return node.save(path) + + def filter_distance(filter_a, filter_b, n=1000): """ Compute a heuristic distance per bit between two Bloom filters. diff --git a/sourmash/sbt_storage.py b/sourmash/sbt_storage.py index 3d78f7eaee..3bdf3a9306 100644 --- a/sourmash/sbt_storage.py +++ b/sourmash/sbt_storage.py @@ -72,6 +72,10 @@ def save(self, path, content): newpath = "{}_{}".format(path, n) fullpath = os.path.join(self.location, self.subdir, newpath) + dirpath = os.path.dirname(fullpath) + if not os.path.exists(dirpath): + os.makedirs(dirpath) + with open(fullpath, 'wb') as f: f.write(content) diff --git a/sourmash/sbtmh.py b/sourmash/sbtmh.py index 282ef1ca3a..725bb69c88 100644 --- a/sourmash/sbtmh.py +++ b/sourmash/sbtmh.py @@ -1,4 +1,5 @@ from io import BytesIO +import os import sys from .sbt import Leaf, SBT, GraphFactory @@ -43,14 +44,21 @@ def __str__(self): return '**Leaf:{name} -> {metadata}'.format( name=self.name, metadata=self.metadata) - def save(self, path): + def save(self, subdir=None): # this is here only for triggering the property load # before we reopen the file (and overwrite the previous # content...) self.data + path = "signatures/" + self.data.md5sum() + + if subdir is not None: + path = os.path.join(subdir, path) + buf = signature.save_signatures([self.data], compression=1) - return self.storage.save(path, buf) + self._path = self.storage.save(path, buf) + + return self._path def update(self, parent): mh = self.data.minhash @@ -66,7 +74,7 @@ def update(self, parent): @property def data(self): if self._data is None: - buf = BytesIO(self.storage.load(self._path)) + buf = self.storage.load(self._path) self._data = signature.load_one_signature(buf) return self._data From 2701a837db27f5fa10fd16329d05d8e63628f2c3 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sat, 31 Oct 2020 13:30:16 -0700 Subject: [PATCH 7/8] split storage setup to new method --- sourmash/commands.py | 7 ++++ sourmash/sbt.py | 98 ++++++++++++++++++++++++++------------------ 2 files changed, 64 insertions(+), 41 deletions(-) diff --git a/sourmash/commands.py b/sourmash/commands.py index 86ec7af630..15fccfe9eb 100644 --- a/sourmash/commands.py +++ b/sourmash/commands.py @@ -337,7 +337,10 @@ def index(args): tree = create_sbt_index(bf_size, n_children=args.n_children) batch = True + # TODO: set up storage here + storage_info = tree._setup_storage(args.sbt_name) + tree.storage = storage_info.storage if args.sparseness < 0 or args.sparseness > 1.0: error('sparseness must be in range [0.0, 1.0].') @@ -415,6 +418,10 @@ def index(args): sys.exit(-1) notify('loaded {} sigs; saving SBT under "{}"', n, args.sbt_name) + # TODO: if all nodes are already saved (like in the scaffold/batch case) + # we can potentially set structure_only=True here. An alternative is to + # modify Node.save to verify if the data is already saved or still need to + # be saved (dirty flag?) tree.save(args.sbt_name, sparseness=args.sparseness) diff --git a/sourmash/sbt.py b/sourmash/sbt.py index 0fc5c8333d..8ed91b6db5 100644 --- a/sourmash/sbt.py +++ b/sourmash/sbt.py @@ -72,6 +72,9 @@ def search_transcript(node, seq, threshold): NodePos = namedtuple("NodePos", ["pos", "node"]) +StorageInfo = namedtuple("StorageInfo", + ["kind", "storage", "backend", "name", "subdir", + "storage_args", "index_filename"]) class GraphFactory(object): @@ -557,40 +560,14 @@ def child(self, parent, pos): node = self._nodes.get(cd, None) return NodePos(cd, node) - def save(self, path, storage=None, sparseness=0.0, structure_only=False): - """Saves an SBT description locally and node data to a storage. - - Parameters - ---------- - path : str - path to where the SBT description should be saved. - storage : Storage, optional - Storage to be used for saving node data. - Defaults to FSStorage (a hidden directory at the same level of path) - sparseness : float - How much of the internal nodes should be saved. - Defaults to 0.0 (save all internal nodes data), - can go up to 1.0 (don't save any internal nodes data) - structure_only: boolean - Write only the index schema and metadata, but not the data. - Defaults to False (save data too) - - Returns - ------- - str - full path to the new SBT description - """ - info = {} - info['d'] = self.d - info['version'] = 7 - info["index_type"] = self.__class__.__name__ # TODO: check - + def _setup_storage(self, path, storage=None): # choose between ZipStorage and FS (file system/directory) storage. if not path.endswith(".sbt.json"): kind = "Zip" if not path.endswith('.sbt.zip'): path += '.sbt.zip' - storage = ZipStorage(path) + if storage is None: + storage = ZipStorage(path) backend = "FSStorage" name = os.path.basename(path[:-8]) subdir = '.sbt.{}'.format(name) @@ -600,6 +577,7 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): else: # path.endswith('.sbt.json') assert path.endswith('.sbt.json') kind = "FS" + subdir = None name = os.path.basename(path) name = name[:-9] index_filename = os.path.abspath(path) @@ -615,9 +593,43 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): backend = [k for (k, v) in STORAGES.items() if v == type(storage)][0] storage_args = storage.init_args() + return StorageInfo(kind, storage, backend, name, subdir, storage_args, index_filename) + + def save(self, path, storage=None, sparseness=0.0, structure_only=False): + """Saves an SBT description locally and node data to a storage. + + Parameters + ---------- + path : str + path to where the SBT description should be saved. + storage : Storage, optional + Storage to be used for saving node data. + Defaults to FSStorage (a hidden directory at the same level of path) + sparseness : float + How much of the internal nodes should be saved. + Defaults to 0.0 (save all internal nodes data), + can go up to 1.0 (don't save any internal nodes data) + structure_only: boolean + Write only the index schema and metadata, but not the data. + Defaults to False (save data too) + + Returns + ------- + str + full path to the new SBT description + """ + info = {} + info['d'] = self.d + info['version'] = 7 + info["index_type"] = self.__class__.__name__ # TODO: check + + if storage is None and self.storage is not None: + storage = self.storage + sinfo = self._setup_storage(path, storage) + info['storage'] = { - 'backend': backend, - 'args': storage_args + 'backend': sinfo.backend, + 'args': sinfo.storage_args } info['factory'] = { 'class': GraphFactory.__name__, @@ -646,11 +658,11 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): # trigger data loading before saving to the new place node.data - node.storage = storage + node.storage = sinfo.storage basepath = None - if kind == "Zip": - basepath = subdir + if sinfo.kind == "Zip": + basepath = sinfo.subdir data["filename"] = _save_node(node, basepath) else: @@ -669,17 +681,17 @@ def save(self, path, storage=None, sparseness=0.0, structure_only=False): info['nodes'] = nodes info['signatures'] = leaves - if kind == "Zip": + if sinfo.kind == "Zip": tree_data = json.dumps(info).encode("utf-8") - save_path = "{}.sbt.json".format(name) - storage.save(save_path, tree_data) - storage.close() + save_path = "{}.sbt.json".format(sinfo.name) + sinfo.storage.save(save_path, tree_data) + sinfo.storage.close() - elif kind == "FS": - with open(index_filename, 'w') as fp: + elif sinfo.kind == "FS": + with open(sinfo.index_filename, 'w') as fp: json.dump(info, fp) - notify("Finished saving SBT index, available at {0}\n".format(index_filename)) + notify("Finished saving SBT index, available at {0}\n".format(sinfo.index_filename)) return path @@ -1501,8 +1513,11 @@ def scaffold(original_datasets, storage, factory=None): # save d1 and d2 Nodes into storage and unload them, if a storage is available if storage is not None: + d1.element.storage = storage _save_node(d1.element) d1.element.unload() + + d2.element.storage = storage _save_node(d2.element) d2.element.unload() @@ -1531,6 +1546,7 @@ def scaffold(original_datasets, storage, factory=None): # save d into storage and unload it, if a storage is available if storage is not None: + d.element.storage = storage _save_node(d.element) d.element.unload() From a3cbdd45f599d8a61f712b81365cee48ec1eac27 Mon Sep 17 00:00:00 2001 From: Luiz Irber Date: Sat, 31 Oct 2020 14:12:15 -0700 Subject: [PATCH 8/8] comments --- sourmash/sbt.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sourmash/sbt.py b/sourmash/sbt.py index 8ed91b6db5..ead8b6c66e 100644 --- a/sourmash/sbt.py +++ b/sourmash/sbt.py @@ -1371,7 +1371,8 @@ def scaffold(original_datasets, storage, factory=None): raise ValueError("unknown dataset type") del original_datasets - # TODO: we can build the heap in parallel. + # TODO: we can build the heap in parallel, if the data was + # pickle-able for multiprocessing... # on top of doing the count_common calculations in parallel, # we can also avoid building the heap (just build a list first) # and then call heapify on it after the list is ready @@ -1400,7 +1401,8 @@ def scaffold(original_datasets, storage, factory=None): # heapq defaults to a min heap, # invert "common" here to avoid having to use the # internal methods for a max heap - heapq.heappush(heap, (-common, i, j)) + heap.append((-common, i, j)) + heapq.heapify(heap) if factory is None: n_unique_hashes = len(hll) @@ -1409,6 +1411,8 @@ def scaffold(original_datasets, storage, factory=None): #htable_size *= num_htables print(len(hll), num_htables, htable_size) + # TODO: turns out len(hll) is too big in most cases. + # need a better heuristic for optimal size... htable_size = 1e5 num_htables = 1 @@ -1433,7 +1437,7 @@ def scaffold(original_datasets, storage, factory=None): processed.add(p2) # Name will be updated later, when we have the right position - new_node = Node(factory, name=f"internal.XXX") + new_node = Node(factory) data1.update(new_node) data2.update(new_node) @@ -1462,7 +1466,7 @@ def scaffold(original_datasets, storage, factory=None): processed.add(p1) # Name will be updated later, when we have the right position - new_node = Node(factory, name=f"internal.XXX") + new_node = Node(factory) d.update(new_node) new_internal = InternalNode(new_node, BinaryLeaf(d), None) @@ -1478,16 +1482,15 @@ def scaffold(original_datasets, storage, factory=None): next_round = [] total_nodes = len(current_round) - # TODO: we can build the heap in parallel. - # on top of doing the intersection_count calculations in parallel, - # we can also avoid building the heap (just build a list first) - # and then call heapify on it after the list is ready + # TODO: we can build the heap in parallel, if the data was + # pickle-able for multiprocessing... heap = [] for (i, d1) in enumerate(current_round): for (j, d2) in enumerate(current_round): if i > j: common = d1.element.data.intersection_count(d2.element.data) - heapq.heappush(heap, (-common, i, j)) + heap.append((-common, i, j)) + heapq.heapify(heap) processed = set() while heap: