Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6850] Add tests and docs for ported Bloom Filter classes #9700

Merged
merged 1 commit into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,20 @@ This product includes code from Apache Hadoop

* org.apache.hudi.common.bloom.InternalDynamicBloomFilter.java adapted from org.apache.hadoop.util.bloom.DynamicBloomFilter.java

* org.apache.hudi.common.bloom.InternalFilter copied from classes in org.apache.hadoop.util.bloom package
* org.apache.hudi.common.bloom.InternalFilter.java adapted from org.apache.hadoop.util.bloom.Filter.java
and org.apache.hadoop.io.Writable.java

* org.apache.hudi.common.bloom.InternalBloomFilter adapted from org.apache.hadoop.util.bloom.BloomFilter.java

* org.apache.hudi.common.bloom.Key.java adapted from org.apache.hadoop.util.bloom.Key.java

* org.apache.hudi.common.bloom.HashFunction.java ported from org.apache.hadoop.util.bloom.HashFunction.java

* org.apache.hudi.common.util.hash.Hash.java ported from org.apache.hadoop.util.hash.Hash.java

* org.apache.hudi.common.util.hash.JenkinsHash.java ported from org.apache.hadoop.util.hash.JenkinsHash.java

* org.apache.hudi.common.util.hash.MurmurHash.java ported from org.apache.hadoop.util.hash.MurmurHash.java

with the following license

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,51 @@
* specific language governing permissions and limitations
* under the License.
*/
/**
* Copyright (c) 2005, European Commission project OneLab under contract 034819
* (http://www.one-lab.org)
* <p>
* All rights reserved.
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
* - Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the distribution.
* - Neither the name of the University Catholique de Louvain - UCL
* nor the names of its contributors may be used to endorse or
* promote products derived from this software without specific prior
* written permission.
* <p>
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

package org.apache.hudi.common.bloom;

import org.apache.hudi.common.util.hash.Hash;

/**
* Implements a hash object that returns a certain number of hashed values.
* <p>
* The code in class is ported from {@link org.apache.hadoop.util.bloom.HashFunction} in Apache Hadoop.
*
* @see Key The general behavior of a key being stored in a bloom filter
* @see InternalBloomFilter The general behavior of a bloom filter
*/
public class HashFunction {
public final class HashFunction {
/**
* The number of hashed values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
* Implements a <i>Bloom filter</i>, as defined by Bloom in 1970.
* <p>
* The code in class is adapted from {@link org.apache.hadoop.util.bloom.BloomFilter} in Apache Hadoop.
* The serialization and deserialization are completely the same as and compatible with Hadoop's
* {@link org.apache.hadoop.util.bloom.BloomFilter}, so that this class correctly reads bloom
* filters serialized by older Hudi versions using Hadoop's BloomFilter.
* <p>
* Hudi serializes bloom filter(s) and write them to Parquet file footers and metadata table's
* bloom filter partition containing bloom filters for all data files. We want to maintain the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,20 @@
import java.util.List;

/**
* Ported from {@link org.apache.hadoop.util.bloom.Filter}.
* Defines the general behavior of a filter.
* <p>
* The code in class is adapted from {@link org.apache.hadoop.util.bloom.Filter} in Apache Hadoop.
* <p>
* A filter is a data structure which aims at offering a lossy summary of a set <code>A</code>. The
* key idea is to map entries of <code>A</code> (also called <i>keys</i>) into several positions
* in a vector through the use of several hash functions.
* <p>
* Typically, a filter will be implemented as a Bloom filter (or a Bloom filter extension).
* <p>
* It must be extended in order to define the real behavior.
*
* @see Key The general behavior of a key
* @see HashFunction A hash function
*/
abstract class InternalFilter {
private static final int VERSION = -1; // negative to accommodate for old format
Expand Down Expand Up @@ -160,13 +173,28 @@ public void add(Key[] keys) {
}
} //end add()

/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
public void write(DataOutput out) throws IOException {
out.writeInt(VERSION);
out.writeInt(this.nbHash);
out.writeByte(this.hashType);
out.writeInt(this.vectorSize);
}

/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
public void readFields(DataInput in) throws IOException {
int ver = in.readInt();
if (ver > 0) { // old non-versioned format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@

/**
* The general behavior of a key that must be stored in a bloom filter.
* <p>
* The code in class is adapted from {@link org.apache.hadoop.util.bloom.Key} in Apache Hadoop.
*
* @see InternalBloomFilter The general behavior of a bloom filter and how the key is used.
*/
public final class Key implements Comparable<Key> {
public class Key implements Comparable<Key> {
/**
* Byte value of key
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
/**
* This class represents a common API for hashing functions used by
* {@link InternalBloomFilter}.
* <p>
* The code in class is ported from {@link org.apache.hadoop.util.hash.Hash} in Apache Hadoop.
*/
public abstract class Hash {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

/**
* Produces 32-bit hash for hash table lookup.
*
* <p>
* The code in class is ported from {@link org.apache.hadoop.util.hash.JenkinsHash} in Apache Hadoop.
* <p>
* <pre>lookup3.c, by Bob Jenkins, May 2006, Public Domain.
*
* You can use this free for any purpose. It's in the public domain.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
/**
* This is a very fast, non-cryptographic hash suitable for general hash-based
* lookup. See http://murmurhash.googlepages.com/ for more details.
*
* <p>
* The code in class is ported from {@link org.apache.hadoop.util.hash.MurmurHash} in Apache Hadoop.
* <p>
* <p>The C version of MurmurHash 2.0 found at that site was ported
* to Java by Andrzej Bialecki (ab at getopt org).</p>
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,21 @@

package org.apache.hudi.common.bloom;

import org.apache.hudi.common.util.hash.Hash;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.common.testutils.FileSystemTestUtils.readLastLineFromResourceFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -92,11 +98,75 @@ public void testSerialize(String typeCode) {
}
}

public static List<Arguments> bloomFilterParams() {
return Arrays.asList(
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 200, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 1000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 5000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 10000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.SIMPLE.name(), 5000, 0.000001, Hash.JENKINS_HASH, -1),
Arguments.of("hadoop", BloomFilterTypeCode.DYNAMIC_V0.name(), 200, 0.000001, Hash.MURMUR_HASH, 1000),
Arguments.of("hadoop", BloomFilterTypeCode.DYNAMIC_V0.name(), 1000, 0.000001, Hash.MURMUR_HASH, 5000),
Arguments.of("hadoop", BloomFilterTypeCode.DYNAMIC_V0.name(), 1000, 0.000001, Hash.JENKINS_HASH, 5000),
Arguments.of("hudi", BloomFilterTypeCode.SIMPLE.name(), 1000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hudi", BloomFilterTypeCode.SIMPLE.name(), 5000, 0.000001, Hash.MURMUR_HASH, -1),
Arguments.of("hudi", BloomFilterTypeCode.DYNAMIC_V0.name(), 1000, 0.000001, Hash.MURMUR_HASH, 5000)
);
}

@ParameterizedTest
@MethodSource("bloomFilterParams")
public void testDeserialize(String lib, String typeCode, int numEntries,
double errorRate, int hashType, int maxEntries) throws IOException {
// When the "lib" = "hadoop", this tests the backwards compatibility so that Hudi's
// {@link InternalBloomFilter} correctly reads the bloom filters serialized by Hadoop
List<String> keyList = Arrays.stream(
readLastLineFromResourceFile("/format/bloom-filter/hadoop/all_10000.keys.data").split(","))
.collect(Collectors.toList());
String serializedFilter;
if ("hadoop".equals(lib)) {
String fileName = (BloomFilterTypeCode.DYNAMIC_V0.name().equals(typeCode) ? "dynamic" : "simple")
+ "_" + numEntries
+ "_000001_"
+ (hashType == Hash.MURMUR_HASH ? "murmur" : "jenkins")
+ (BloomFilterTypeCode.DYNAMIC_V0.name().equals(typeCode) ? "_" + maxEntries : "")
+ ".bf.data";
serializedFilter = readLastLineFromResourceFile("/format/bloom-filter/hadoop/" + fileName);
} else {
BloomFilter inputFilter = getBloomFilter(typeCode, numEntries, errorRate, maxEntries);
for (String key : keyList) {
inputFilter.add(key);
}
serializedFilter = inputFilter.serializeToString();
}
validateBloomFilter(
serializedFilter, keyList, lib, typeCode, numEntries, errorRate, hashType, maxEntries);
}

BloomFilter getBloomFilter(String typeCode, int numEntries, double errorRate, int maxEntries) {
if (typeCode.equalsIgnoreCase(BloomFilterTypeCode.SIMPLE.name())) {
return BloomFilterFactory.createBloomFilter(numEntries, errorRate, -1, typeCode);
} else {
return BloomFilterFactory.createBloomFilter(numEntries, errorRate, maxEntries, typeCode);
}
}

private void validateBloomFilter(String serializedFilter, List<String> keyList, String lib,
String typeCode, int numEntries, double errorRate,
int hashType, int maxEntries) {
BloomFilter bloomFilter = BloomFilterFactory
.fromString(serializedFilter, typeCode);
for (String key : keyList) {
assertTrue(bloomFilter.mightContain(key), "Filter should have returned true for " + key);
}
if ("hadoop".equals(lib) && hashType == Hash.MURMUR_HASH) {
BloomFilter hudiBloomFilter = getBloomFilter(typeCode, numEntries, errorRate, maxEntries);
for (String key : keyList) {
hudiBloomFilter.add(key);
}
// Hadoop library-serialized bloom filter should be exactly the same as Hudi one,
// unless we made our customization in the future
assertEquals(hudiBloomFilter.serializeToString(), serializedFilter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@

package org.apache.hudi.common.table.log;

import org.apache.hudi.common.util.FileIOUtils;

import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
Expand All @@ -35,6 +32,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.hudi.common.testutils.FileSystemTestUtils.readLastLineFromResourceFile;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;

Expand Down Expand Up @@ -92,11 +90,4 @@ public static void assertPositionEquals(List<Long> expectedPositions,
assertFalse(expectedIterator.hasNext());
assertFalse(iterator.hasNext());
}

private String readLastLineFromResourceFile(String resourceName) throws IOException {
try (InputStream inputStream = TestLogReaderUtils.class.getResourceAsStream(resourceName)) {
List<String> lines = FileIOUtils.readAsUTFStringLines(inputStream);
return lines.get(lines.size() - 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.hudi.common.fs.inline.InLineFSUtils;
import org.apache.hudi.common.fs.inline.InLineFileSystem;
import org.apache.hudi.common.fs.inline.InMemoryFileSystem;
import org.apache.hudi.common.table.log.TestLogReaderUtils;
import org.apache.hudi.common.util.FileIOUtils;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -30,6 +32,7 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -86,4 +89,11 @@ public static List<FileStatus> listFiles(FileSystem fs, Path path, boolean recur
}
return statuses;
}

public static String readLastLineFromResourceFile(String resourceName) throws IOException {
try (InputStream inputStream = TestLogReaderUtils.class.getResourceAsStream(resourceName)) {
List<String> lines = FileIOUtils.readAsUTFStringLines(inputStream);
return lines.get(lines.size() - 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.common.testutils;

import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand All @@ -34,6 +33,8 @@
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.junit.jupiter.api.Assumptions;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -44,7 +45,6 @@
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.junit.jupiter.api.Assumptions;

/**
* A utility class for testing.
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Loading