diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java index c2433efc5..cfb21d9da 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java @@ -29,6 +29,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.index.bloom.FlinkHoodieBloomIndex; import org.apache.hudi.index.state.FlinkInMemoryStateIndex; import org.apache.hudi.PublicAPIMethod; import org.apache.hudi.table.HoodieTable; @@ -58,6 +59,8 @@ public abstract class FlinkHoodieIndex extends Ho switch (config.getIndexType()) { case INMEMORY: return new FlinkInMemoryStateIndex<>(context, config); + case BLOOM: + return new FlinkHoodieBloomIndex(config); default: throw new HoodieIndexException("Unsupported index type " + config.getIndexType()); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java new file mode 100644 index 000000000..6a3edc7c3 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/FlinkHoodieBloomIndex.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.MetadataNotFoundException; +import org.apache.hudi.index.FlinkHoodieIndex; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.io.HoodieKeyLookupHandle; +import org.apache.hudi.io.HoodieRangeInfoHandle; +import org.apache.hudi.table.HoodieTable; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import com.beust.jcommander.internal.Lists; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import scala.Tuple2; + +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.toList; +import static org.apache.hudi.index.HoodieIndexUtils.getLatestBaseFilesForAllPartitions; + +/** + * Indexing mechanism based on bloom filter. Each parquet file includes its row_key bloom filter in its metadata. + */ +@SuppressWarnings("checkstyle:LineLength") +public class FlinkHoodieBloomIndex extends FlinkHoodieIndex { + + private static final Logger LOG = LogManager.getLogger(FlinkHoodieBloomIndex.class); + + public FlinkHoodieBloomIndex(HoodieWriteConfig config) { + super(config); + } + + @Override + public List> tagLocation(List> records, HoodieEngineContext context, + HoodieTable>, List, List> hoodieTable) { + // Step 1: Extract out thinner Map of (partitionPath, recordKey) + Map> partitionRecordKeyMap = new HashMap<>(); + records.forEach(record -> { + if (partitionRecordKeyMap.containsKey(record.getPartitionPath())) { + partitionRecordKeyMap.get(record.getPartitionPath()).add(record.getRecordKey()); + } else { + List recordKeys = Lists.newArrayList(); + recordKeys.add(record.getRecordKey()); + partitionRecordKeyMap.put(record.getPartitionPath(), recordKeys); + } + }); + + // Step 2: Lookup indexes for all the partition/recordkey pair + Map keyFilenamePairMap = + lookupIndex(partitionRecordKeyMap, context, hoodieTable); + + if (LOG.isDebugEnabled()) { + long totalTaggedRecords = keyFilenamePairMap.values().size(); + LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords); + } + + // Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys + List> taggedRecords = tagLocationBacktoRecords(keyFilenamePairMap, records); + + return taggedRecords; + } + + /** + * Lookup the location for each record key and return the pair for all record keys already + * present and drop the record keys if not present. + */ + private Map lookupIndex( + Map> partitionRecordKeyMap, final HoodieEngineContext context, + final HoodieTable hoodieTable) { + // Obtain records per partition, in the incoming records + Map recordsPerPartition = new HashMap<>(); + partitionRecordKeyMap.keySet().forEach(k -> recordsPerPartition.put(k, Long.valueOf(partitionRecordKeyMap.get(k).size()))); + List affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet()); + + // Step 2: Load all involved files as pairs + List> fileInfoList = + loadInvolvedFiles(affectedPartitionPathList, context, hoodieTable); + final Map> partitionToFileInfo = + fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList()))); + + // Step 3: Obtain a List, for each incoming record, that already exists, with the file id, + // that contains it. + List> fileComparisons = + explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyMap); + return findMatchingFilesForRecordKeys(fileComparisons, hoodieTable); + } + + /** + * Load all involved files as pair List. + */ + //TODO duplicate code with spark, we can optimize this method later + List> loadInvolvedFiles(List partitions, final HoodieEngineContext context, + final HoodieTable hoodieTable) { + // Obtain the latest data files from all the partitions. + List> partitionPathFileIDList = getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream() + .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId())) + .collect(toList()); + + if (config.getBloomIndexPruneByRanges()) { + // also obtain file ranges, if range pruning is enabled + context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)"); + return context.map(partitionPathFileIDList, pf -> { + try { + HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf); + String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys(); + return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1])); + } catch (MetadataNotFoundException me) { + LOG.warn("Unable to find range metadata in file :" + pf); + return new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue())); + } + }, Math.max(partitionPathFileIDList.size(), 1)); + } else { + return partitionPathFileIDList.stream() + .map(pf -> new Tuple2<>(pf.getKey(), new BloomIndexFileInfo(pf.getValue()))).collect(toList()); + } + } + + @Override + public boolean rollbackCommit(String instantTime) { + // Nope, don't need to do anything. + return true; + } + + /** + * This is not global, since we depend on the partitionPath to do the lookup. + */ + @Override + public boolean isGlobal() { + return false; + } + + /** + * No indexes into log files yet. + */ + @Override + public boolean canIndexLogFiles() { + return false; + } + + /** + * Bloom filters are stored, into the same data files. + */ + @Override + public boolean isImplicitWithStorage() { + return true; + } + + /** + * For each incoming record, produce N output records, 1 each for each file against which the record's key needs to be + * checked. For tables, where the keys have a definite insert order (e.g: timestamp as prefix), the number of files + * to be compared gets cut down a lot from range pruning. + *

+ * Sub-partition to ensure the records can be looked up against files & also prune file<=>record comparisons based on + * recordKey ranges in the index info. + */ + List> explodeRecordsWithFileComparisons( + final Map> partitionToFileIndexInfo, + Map> partitionRecordKeyMap) { + IndexFileFilter indexFileFilter = + config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo) + : new ListBasedIndexFileFilter(partitionToFileIndexInfo); + + List> fileRecordPairs = new ArrayList<>(); + partitionRecordKeyMap.keySet().forEach(partitionPath -> { + List hoodieRecordKeys = partitionRecordKeyMap.get(partitionPath); + hoodieRecordKeys.forEach(hoodieRecordKey -> { + indexFileFilter.getMatchingFilesAndPartition(partitionPath, hoodieRecordKey).forEach(partitionFileIdPair -> { + fileRecordPairs.add(new Tuple2<>(partitionFileIdPair.getRight(), + new HoodieKey(hoodieRecordKey, partitionPath))); + }); + }); + }); + return fileRecordPairs; + } + + /** + * Find out pair. + */ + Map findMatchingFilesForRecordKeys( + List> fileComparisons, + HoodieTable hoodieTable) { + + fileComparisons = fileComparisons.stream().sorted((o1, o2) -> o1._1.compareTo(o2._1)).collect(toList()); + + List keyLookupResults = new ArrayList<>(); + + Iterator> iterator = new HoodieFlinkBloomIndexCheckFunction(hoodieTable, config).apply(fileComparisons.iterator()); + while (iterator.hasNext()) { + keyLookupResults.addAll(iterator.next()); + } + + Map hoodieRecordLocationMap = new HashMap<>(); + + keyLookupResults = keyLookupResults.stream().filter(lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList()); + keyLookupResults.forEach(lookupResult -> { + lookupResult.getMatchingRecordKeys().forEach(r -> { + hoodieRecordLocationMap.put(new HoodieKey(r, lookupResult.getPartitionPath()), new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())); + }); + }); + + return hoodieRecordLocationMap; + } + + + /** + * Tag the back to the original HoodieRecord List. + */ + protected List> tagLocationBacktoRecords( + Map keyFilenamePair, List> records) { + Map> keyRecordPairMap = new HashMap<>(); + records.forEach(r -> keyRecordPairMap.put(r.getKey(), r)); + // Here as the record might have more data than rowKey (some rowKeys' fileId is null), + // so we do left outer join. + List, HoodieRecordLocation>> newList = new ArrayList<>(); + keyRecordPairMap.keySet().forEach(k -> { + if (keyFilenamePair.containsKey(k)) { + newList.add(new Tuple2(keyRecordPairMap.get(k), keyFilenamePair.get(k))); + } else { + newList.add(new Tuple2(keyRecordPairMap.get(k), null)); + } + }); + List> res = Lists.newArrayList(); + for (Tuple2, HoodieRecordLocation> v : newList) { + res.add(HoodieIndexUtils.getTaggedRecord(v._1, Option.ofNullable(v._2))); + } + return res; + } + + @Override + public List updateLocation(List writeStatusList, HoodieEngineContext context, + HoodieTable>, List, List> hoodieTable) { + return writeStatusList; + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java new file mode 100644 index 000000000..33ec9e65d --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/bloom/HoodieFlinkBloomIndexCheckFunction.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.hudi.client.utils.LazyIterableIterator; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.io.HoodieKeyLookupHandle; +import org.apache.hudi.io.HoodieKeyLookupHandle.KeyLookupResult; +import org.apache.hudi.table.HoodieTable; + +import java.util.function.Function; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import scala.Tuple2; + +/** + * Function performing actual checking of list containing (fileId, hoodieKeys) against the actual files. + */ +//TODO we can move this class into the hudi-client-common and reuse it for spark client +public class HoodieFlinkBloomIndexCheckFunction + implements Function>, Iterator>> { + + private final HoodieTable hoodieTable; + + private final HoodieWriteConfig config; + + public HoodieFlinkBloomIndexCheckFunction(HoodieTable hoodieTable, HoodieWriteConfig config) { + this.hoodieTable = hoodieTable; + this.config = config; + } + + @Override + public Iterator> apply(Iterator> fileParitionRecordKeyTripletItr) { + return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr); + } + + @Override + public Function>> compose(Function>> before) { + return null; + } + + @Override + public Function>, V> andThen(Function>, ? extends V> after) { + return null; + } + + class LazyKeyCheckIterator extends LazyIterableIterator, List> { + + private HoodieKeyLookupHandle keyLookupHandle; + + LazyKeyCheckIterator(Iterator> filePartitionRecordKeyTripletItr) { + super(filePartitionRecordKeyTripletItr); + } + + @Override + protected void start() { + } + + @Override + protected List computeNext() { + List ret = new ArrayList<>(); + try { + // process one file in each go. + while (inputItr.hasNext()) { + Tuple2 currentTuple = inputItr.next(); + String fileId = currentTuple._1; + String partitionPath = currentTuple._2.getPartitionPath(); + String recordKey = currentTuple._2.getRecordKey(); + Pair partitionPathFilePair = Pair.of(partitionPath, fileId); + + // lazily init state + if (keyLookupHandle == null) { + keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); + } + + // if continue on current file + if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) { + keyLookupHandle.addKey(recordKey); + } else { + // do the actual checking of file & break out + ret.add(keyLookupHandle.getLookupResult()); + keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair); + keyLookupHandle.addKey(recordKey); + break; + } + } + + // handle case, where we ran out of input, close pending work, update return val + if (!inputItr.hasNext()) { + ret.add(keyLookupHandle.getLookupResult()); + } + } catch (Throwable e) { + if (e instanceof HoodieException) { + throw e; + } + throw new HoodieIndexException("Error checking bloom filter index. ", e); + } + return ret; + } + + @Override + protected void end() { + } + } +} diff --git a/hudi-client/hudi-flink-client/src/main/resources/log4j-surefire.properties b/hudi-client/hudi-flink-client/src/main/resources/log4j-surefire.properties new file mode 100644 index 000000000..32af46209 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/main/resources/log4j-surefire.properties @@ -0,0 +1,31 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +log4j.rootLogger=WARN, CONSOLE +log4j.logger.org.apache=INFO +log4j.logger.org.apache.hudi=DEBUG +log4j.logger.org.apache.hadoop.hbase=ERROR + +# A1 is set to be a ConsoleAppender. +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +# A1 uses PatternLayout. +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n +log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter +log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true +log4j.appender.CONSOLE.filter.a.LevelMin=WARN +log4j.appender.CONSOLE.filter.a.LevelMax=FATAL diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java new file mode 100644 index 000000000..0dc699779 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bloom; + +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.io.HoodieKeyLookupHandle; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.HoodieFlinkClientTestHarness; +import org.apache.hudi.testutils.HoodieFlinkWriteableTestTable; + +import org.apache.avro.Schema; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import scala.Tuple2; + +import static java.util.Arrays.asList; +import static java.util.UUID.randomUUID; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit test against FlinkHoodieBloomIndex. + */ +//TODO merge code with Spark Bloom index tests. +public class TestFlinkHoodieBloomIndex extends HoodieFlinkClientTestHarness { + + private static final Schema SCHEMA = getSchemaFromResource(TestFlinkHoodieBloomIndex.class, "/exampleSchema.avsc", true); + private static final String TEST_NAME_WITH_PARAMS = "[{index}] Test with rangePruning={0}, treeFiltering={1}, bucketizedChecking={2}"; + + public static Stream configParams() { + Object[][] data = + new Object[][] {{true, true, true}, {false, true, true}, {true, true, false}, {true, false, true}}; + return Stream.of(data).map(Arguments::of); + } + + @BeforeEach + public void setUp() throws Exception { + initPath(); + initFileSystem(); + // We have some records to be tagged (two different partitions) + initMetaClient(); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + private HoodieWriteConfig makeConfig(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) { + return HoodieWriteConfig.newBuilder().withPath(basePath) + .withIndexConfig(HoodieIndexConfig.newBuilder().bloomIndexPruneByRanges(rangePruning) + .bloomIndexTreebasedFilter(treeFiltering).bloomIndexBucketizedChecking(bucketizedChecking) + .bloomIndexKeysPerBucket(2).build()) + .build(); + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testLoadInvolvedFiles(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { + HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); + FlinkHoodieBloomIndex index = new FlinkHoodieBloomIndex(config); + HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient); + HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA); + + // Create some partitions, and put some files + // "2016/01/21": 0 file + // "2016/04/01": 1 file (2_0_20160401010101.parquet) + // "2015/03/12": 3 files (1_0_20150312101010.parquet, 3_0_20150312101010.parquet, 4_0_20150312101010.parquet) + testTable.withPartitionMetaFiles("2016/01/21", "2016/04/01", "2015/03/12"); + + RawTripTestPayload rowChange1 = + new RawTripTestPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record1 = + new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); + RawTripTestPayload rowChange2 = + new RawTripTestPayload("{\"_row_key\":\"001\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record2 = + new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); + RawTripTestPayload rowChange3 = + new RawTripTestPayload("{\"_row_key\":\"002\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record3 = + new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3); + RawTripTestPayload rowChange4 = + new RawTripTestPayload("{\"_row_key\":\"003\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord record4 = + new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); + + List partitions = asList("2016/01/21", "2016/04/01", "2015/03/12"); + List> filesList = index.loadInvolvedFiles(partitions, context, hoodieTable); + // Still 0, as no valid commit + assertEquals(0, filesList.size()); + + testTable.addCommit("20160401010101").withInserts("2016/04/01", "2"); + testTable.addCommit("20150312101010").withInserts("2015/03/12", "1") + .withInserts("2015/03/12", "3", record1) + .withInserts("2015/03/12", "4", record2, record3, record4); + metaClient.reloadActiveTimeline(); + + filesList = index.loadInvolvedFiles(partitions, context, hoodieTable); + assertEquals(4, filesList.size()); + + if (rangePruning) { + // these files will not have the key ranges + assertNull(filesList.get(0)._2().getMaxRecordKey()); + assertNull(filesList.get(0)._2().getMinRecordKey()); + assertFalse(filesList.get(1)._2().hasKeyRanges()); + assertNotNull(filesList.get(2)._2().getMaxRecordKey()); + assertNotNull(filesList.get(2)._2().getMinRecordKey()); + assertTrue(filesList.get(3)._2().hasKeyRanges()); + + // no longer sorted, but should have same files. + + List> expected = + asList(new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")), + new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")), + new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000", "000")), + new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001", "003"))); + assertEquals(expected, filesList); + } + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testRangePruning(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) { + HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); + FlinkHoodieBloomIndex index = new FlinkHoodieBloomIndex(config); + + final Map> partitionToFileIndexInfo = new HashMap<>(); + partitionToFileIndexInfo.put("2017/10/22", + asList(new BloomIndexFileInfo("f1"), new BloomIndexFileInfo("f2", "000", "000"), + new BloomIndexFileInfo("f3", "001", "003"), new BloomIndexFileInfo("f4", "002", "007"), + new BloomIndexFileInfo("f5", "009", "010"))); + + Map> partitionRecordKeyMap = new HashMap<>(); + asList(new Tuple2<>("2017/10/22", "003"), new Tuple2<>("2017/10/22", "002"), + new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", "004")) + .forEach(t -> { + List recordKeyList = partitionRecordKeyMap.getOrDefault(t._1, new ArrayList<>()); + recordKeyList.add(t._2); + partitionRecordKeyMap.put(t._1, recordKeyList); + }); + + List> comparisonKeyList = + index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyMap); + + assertEquals(10, comparisonKeyList.size()); + java.util.Map> recordKeyToFileComps = comparisonKeyList.stream() + .collect(java.util.stream.Collectors.groupingBy(t -> t._2.getRecordKey(), java.util.stream.Collectors.mapping(t -> t._1, java.util.stream.Collectors.toList()))); + + assertEquals(4, recordKeyToFileComps.size()); + assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("002"))); + assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("003"))); + assertEquals(new java.util.HashSet<>(asList("f1", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("004"))); + assertEquals(new java.util.HashSet<>(asList("f1", "f4")), new java.util.HashSet<>(recordKeyToFileComps.get("005"))); + } + + @Test + public void testCheckUUIDsAgainstOneFile() throws Exception { + final String partition = "2016/01/31"; + // Create some records to use + String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + String recordStr4 = "{\"_row_key\":\"4eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":32}"; + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + HoodieRecord record1 = + new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + HoodieRecord record2 = + new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); + RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); + HoodieRecord record3 = + new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3); + RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4); + HoodieRecord record4 = + new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); + + // We write record1, record2 to a parquet file, but the bloom filter contains (record1, + // record2, record3). + BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); + filter.add(record3.getRecordKey()); + HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(metaClient, SCHEMA, filter); + String fileId = testTable.addCommit("000").getFileIdWithInserts(partition, record1, record2); + String filename = testTable.getBaseFileNameById(fileId); + + // The bloom filter contains 3 records + assertTrue(filter.mightContain(record1.getRecordKey())); + assertTrue(filter.mightContain(record2.getRecordKey())); + assertTrue(filter.mightContain(record3.getRecordKey())); + assertFalse(filter.mightContain(record4.getRecordKey())); + + // Compare with file + List uuids = asList(record1.getRecordKey(), record2.getRecordKey(), record3.getRecordKey(), record4.getRecordKey()); + + HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build(); + HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient); + HoodieKeyLookupHandle keyHandle = new HoodieKeyLookupHandle<>(config, table, Pair.of(partition, fileId)); + List results = keyHandle.checkCandidatesAgainstFile(hadoopConf, uuids, + new Path(java.nio.file.Paths.get(basePath, partition, filename).toString())); + assertEquals(results.size(), 2); + assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0") + || results.get(1).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")); + assertTrue(results.get(0).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0") + || results.get(1).equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")); + // TODO(vc): Need more coverage on actual filenames + // assertTrue(results.get(0)._2().equals(filename)); + // assertTrue(results.get(1)._2().equals(filename)); + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testTagLocationWithEmptyList(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) { + // We have some records to be tagged (two different partitions) + List records = new ArrayList<>(); + // Also create the metadata and config + HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieFlinkTable table = HoodieFlinkTable.create(config, context, metaClient); + + // Let's tag + FlinkHoodieBloomIndex bloomIndex = new FlinkHoodieBloomIndex(config); + + assertDoesNotThrow(() -> { + bloomIndex.tagLocation(records, context, table); + }, "EmptyList should not result in IllegalArgumentException: Positive number of slices required"); + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testTagLocation(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { + // We have some records to be tagged (two different partitions) + String rowKey1 = randomUUID().toString(); + String rowKey2 = randomUUID().toString(); + String rowKey3 = randomUUID().toString(); + String recordStr1 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"" + rowKey2 + "\",\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"" + rowKey3 + "\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + // place same row key under a different partition. + String recordStr4 = "{\"_row_key\":\"" + rowKey1 + "\",\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + HoodieRecord record1 = + new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + HoodieRecord record2 = + new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); + RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); + HoodieRecord record3 = + new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3); + RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4); + HoodieRecord record4 = + new HoodieRecord(new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()), rowChange4); + List records = asList(record1, record2, record3, record4); + + // Also create the metadata and config + HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); + HoodieFlinkTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient); + HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA); + + // Let's tag + FlinkHoodieBloomIndex bloomIndex = new FlinkHoodieBloomIndex(config); + List taggedRecords = bloomIndex.tagLocation(records, context, hoodieTable); + + // Should not find any files + for (HoodieRecord record : taggedRecords) { + assertFalse(record.isCurrentLocationKnown()); + } + + // We create three parquet file, each having one record. (two different partitions) + String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1); + String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2); + String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4); + + metaClient.reloadActiveTimeline(); + + // We do the tag again + taggedRecords = bloomIndex.tagLocation(records, context, HoodieFlinkTable.create(config, context, metaClient)); + + // Check results + for (HoodieRecord record : taggedRecords) { + if (record.getRecordKey().equals(rowKey1)) { + if (record.getPartitionPath().equals("2015/01/31")) { + assertEquals(record.getCurrentLocation().getFileId(), fileId3); + } else { + assertEquals(record.getCurrentLocation().getFileId(), fileId1); + } + } else if (record.getRecordKey().equals(rowKey2)) { + assertEquals(record.getCurrentLocation().getFileId(), fileId2); + } else if (record.getRecordKey().equals(rowKey3)) { + assertFalse(record.isCurrentLocationKnown()); + } + } + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testCheckExists(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { + // We have some records to be tagged (two different partitions) + + String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + String recordStr3 = "{\"_row_key\":\"3eb5b87c-1fej-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}"; + // record key same as recordStr2 + String recordStr4 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2015-01-31T03:16:41.415Z\",\"number\":32}"; + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + HoodieKey key1 = new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()); + HoodieRecord record1 = new HoodieRecord(key1, rowChange1); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + HoodieKey key2 = new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()); + HoodieRecord record2 = new HoodieRecord(key2, rowChange2); + RawTripTestPayload rowChange3 = new RawTripTestPayload(recordStr3); + HoodieKey key3 = new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()); + RawTripTestPayload rowChange4 = new RawTripTestPayload(recordStr4); + HoodieKey key4 = new HoodieKey(rowChange4.getRowKey(), rowChange4.getPartitionPath()); + HoodieRecord record4 = new HoodieRecord(key4, rowChange4); + List keys = asList(key1, key2, key3, key4); + + // Also create the metadata and config + HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); + HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, metaClient); + HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA); + + // Let's tag + FlinkHoodieBloomIndex bloomIndex = new FlinkHoodieBloomIndex(config); + List toTagRecords = new ArrayList<>(); + toTagRecords.add(new HoodieRecord(record4.getKey(), null)); + List taggedRecords = bloomIndex.tagLocation(toTagRecords, context, hoodieTable); + Map>> recordLocations = new HashMap<>(); + for (HoodieRecord taggedRecord : taggedRecords) { + recordLocations.put(taggedRecord.getKey(), taggedRecord.isCurrentLocationKnown() + ? Option.of(Pair.of(taggedRecord.getPartitionPath(), taggedRecord.getCurrentLocation().getFileId())) + : Option.empty()); + } + // Should not find any files + for (Option> record : recordLocations.values()) { + assertTrue(!record.isPresent()); + } + + // We create three parquet file, each having one record. (two different partitions) + String fileId1 = testTable.addCommit("001").getFileIdWithInserts("2016/01/31", record1); + String fileId2 = testTable.addCommit("002").getFileIdWithInserts("2016/01/31", record2); + String fileId3 = testTable.addCommit("003").getFileIdWithInserts("2015/01/31", record4); + + // We do the tag again + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieFlinkTable.create(config, context, metaClient); + List toTagRecords1 = new ArrayList<>(); + for (HoodieKey key : keys) { + taggedRecords.add(new HoodieRecord(key, null)); + } + + taggedRecords = bloomIndex.tagLocation(toTagRecords1, context, hoodieTable); + recordLocations.clear(); + for (HoodieRecord taggedRecord : taggedRecords) { + recordLocations.put(taggedRecord.getKey(), taggedRecord.isCurrentLocationKnown() + ? Option.of(Pair.of(taggedRecord.getPartitionPath(), taggedRecord.getCurrentLocation().getFileId())) + : Option.empty()); + } + + // Check results + for (Map.Entry>> record : recordLocations.entrySet()) { + if (record.getKey().getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) { + assertTrue(record.getValue().isPresent()); + assertEquals(fileId1, record.getValue().get().getRight()); + } else if (record.getKey().getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { + assertTrue(record.getValue().isPresent()); + if (record.getKey().getPartitionPath().equals("2015/01/31")) { + assertEquals(fileId3, record.getValue().get().getRight()); + } else { + assertEquals(fileId2, record.getValue().get().getRight()); + } + } else if (record.getKey().getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) { + assertFalse(record.getValue().isPresent()); + } + } + } + + @ParameterizedTest(name = TEST_NAME_WITH_PARAMS) + @MethodSource("configParams") + public void testBloomFilterFalseError(boolean rangePruning, boolean treeFiltering, boolean bucketizedChecking) throws Exception { + // We have two hoodie records + String recordStr1 = "{\"_row_key\":\"1eb5b87a-1feh-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"; + String recordStr2 = "{\"_row_key\":\"2eb5b87b-1feu-4edd-87b4-6ec96dc405a0\"," + + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}"; + + // We write record1 to a parquet file, using a bloom filter having both records + RawTripTestPayload rowChange1 = new RawTripTestPayload(recordStr1); + HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1); + RawTripTestPayload rowChange2 = new RawTripTestPayload(recordStr2); + HoodieRecord record2 = new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2); + + BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); + filter.add(record2.getRecordKey()); + HoodieFlinkWriteableTestTable testTable = HoodieFlinkWriteableTestTable.of(metaClient, SCHEMA, filter); + String fileId = testTable.addCommit("000").getFileIdWithInserts("2016/01/31", record1); + assertTrue(filter.mightContain(record1.getRecordKey())); + assertTrue(filter.mightContain(record2.getRecordKey())); + + // We do the tag + List records = asList(record1, record2); + HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, bucketizedChecking); + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieFlinkTable.create(config, context, metaClient); + + FlinkHoodieBloomIndex bloomIndex = new FlinkHoodieBloomIndex(config); + List taggedRecords = bloomIndex.tagLocation(records, context, table); + + // Check results + for (HoodieRecord record : taggedRecords) { + if (record.getKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) { + assertEquals(record.getCurrentLocation().getFileId(), fileId); + } else if (record.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) { + assertFalse(record.isCurrentLocationKnown()); + } + } + } +} diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java index 4bd9fa39f..171bab9fb 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkClientTestHarness.java @@ -18,21 +18,31 @@ package org.apache.hudi.testutils; +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; +import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.index.bloom.TestFlinkHoodieBloomIndex; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; + import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; @@ -40,7 +50,11 @@ import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; +/** + * The test harness for resource initialization and cleanup. + */ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implements Serializable { protected static final Logger LOG = LogManager.getLogger(HoodieFlinkClientTestHarness.class); @@ -48,6 +62,17 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem protected transient Configuration hadoopConf = null; protected transient FileSystem fs; protected transient MiniClusterWithClientResource flinkCluster = null; + protected transient HoodieFlinkEngineContext context = null; + protected transient ExecutorService executorService; + protected transient HoodieFlinkWriteClient writeClient; + protected transient HoodieTableFileSystemView tableView; + + protected final FlinkTaskContextSupplier supplier = new FlinkTaskContextSupplier(null); + + // dfs + protected transient HdfsTestService hdfsTestService; + protected transient MiniDFSCluster dfsCluster; + protected transient DistributedFileSystem dfs; @BeforeEach public void setTestMethodName(TestInfo testInfo) { @@ -69,6 +94,7 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem protected void initFileSystem() { hadoopConf = new Configuration(); initFileSystemWithConfiguration(hadoopConf); + context = new HoodieFlinkEngineContext(supplier); } private void initFileSystemWithConfiguration(Configuration configuration) { @@ -116,6 +142,19 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem } } + /** + * Cleanups resource group for the subclasses of {@link TestFlinkHoodieBloomIndex}. + */ + public void cleanupResources() throws java.io.IOException { + cleanupClients(); + cleanupFlinkContexts(); + cleanupTestDataGenerator(); + cleanupFileSystem(); + cleanupDFS(); + cleanupExecutorService(); + System.gc(); + } + protected void cleanupFlinkMiniCluster() { if (flinkCluster != null) { flinkCluster.after(); @@ -133,4 +172,59 @@ public class HoodieFlinkClientTestHarness extends HoodieCommonTestHarness implem valuesList.add(value); } } + + /** + * Cleanups hoodie clients. + */ + protected void cleanupClients() throws java.io.IOException { + if (metaClient != null) { + metaClient = null; + } + if (writeClient != null) { + writeClient.close(); + writeClient = null; + } + if (tableView != null) { + tableView.close(); + tableView = null; + } + } + + /** + * Cleanups the distributed file system. + * + * @throws IOException + */ + protected void cleanupDFS() throws java.io.IOException { + if (hdfsTestService != null) { + hdfsTestService.stop(); + dfsCluster.shutdown(); + hdfsTestService = null; + dfsCluster = null; + dfs = null; + } + // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the + // same JVM + FileSystem.closeAll(); + } + + /** + * Cleanups the executor service. + */ + protected void cleanupExecutorService() { + if (this.executorService != null) { + this.executorService.shutdownNow(); + this.executorService = null; + } + } + + /** + * Cleanups Flink contexts. + */ + protected void cleanupFlinkContexts() { + if (context != null) { + LOG.info("Closing flink engine context used in previous test-case"); + context = null; + } + } } diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java new file mode 100644 index 000000000..60ae294e6 --- /dev/null +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.testutils; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.bloom.BloomFilterFactory; +import org.apache.hudi.common.bloom.BloomFilterTypeCode; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType; +import org.apache.hudi.table.HoodieTable; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class HoodieFlinkWriteableTestTable extends HoodieWriteableTestTable { + private static final Logger LOG = LogManager.getLogger(HoodieFlinkWriteableTestTable.class); + + private HoodieFlinkWriteableTestTable(String basePath, org.apache.hadoop.fs.FileSystem fs, HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) { + super(basePath, fs, metaClient, schema, filter); + } + + public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema, BloomFilter filter) { + return new HoodieFlinkWriteableTestTable(metaClient.getBasePath(), metaClient.getRawFs(), metaClient, schema, filter); + } + + public static HoodieFlinkWriteableTestTable of(HoodieTableMetaClient metaClient, Schema schema) { + BloomFilter filter = BloomFilterFactory.createBloomFilter(10000, 0.0000001, -1, BloomFilterTypeCode.SIMPLE.name()); + return of(metaClient, schema, filter); + } + + public static HoodieFlinkWriteableTestTable of(HoodieTable hoodieTable, Schema schema) { + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + return of(metaClient, schema); + } + + public static HoodieFlinkWriteableTestTable of(HoodieTable hoodieTable, Schema schema, BloomFilter filter) { + HoodieTableMetaClient metaClient = hoodieTable.getMetaClient(); + return of(metaClient, schema, filter); + } + + @Override + public HoodieFlinkWriteableTestTable addCommit(String instantTime) throws Exception { + return (HoodieFlinkWriteableTestTable) super.addCommit(instantTime); + } + + @Override + public HoodieFlinkWriteableTestTable forCommit(String instantTime) { + return (HoodieFlinkWriteableTestTable) super.forCommit(instantTime); + } + + public String getFileIdWithInserts(String partition) throws Exception { + return getFileIdWithInserts(partition, new HoodieRecord[0]); + } + + public String getFileIdWithInserts(String partition, HoodieRecord... records) throws Exception { + return getFileIdWithInserts(partition, Arrays.asList(records)); + } + + public String getFileIdWithInserts(String partition, List records) throws Exception { + String fileId = java.util.UUID.randomUUID().toString(); + withInserts(partition, fileId, records); + return fileId; + } + + public HoodieFlinkWriteableTestTable withInserts(String partition, String fileId) throws Exception { + return withInserts(partition, fileId, new HoodieRecord[0]); + } + + public HoodieFlinkWriteableTestTable withInserts(String partition, String fileId, HoodieRecord... records) throws Exception { + return withInserts(partition, fileId, Arrays.asList(records)); + } + + public HoodieFlinkWriteableTestTable withInserts(String partition, String fileId, List records) throws Exception { + return (HoodieFlinkWriteableTestTable) withInserts(partition, fileId, records, new org.apache.hudi.client.FlinkTaskContextSupplier(null)); + } + + public HoodieFlinkWriteableTestTable withLogAppends(List records) throws Exception { + for (List groupedRecords: records.stream().collect(Collectors.groupingBy(HoodieRecord::getCurrentLocation)).values()) { + appendRecordsToLogFile(groupedRecords); + } + return this; + } + + private void appendRecordsToLogFile(List groupedRecords) throws Exception { + String partitionPath = groupedRecords.get(0).getPartitionPath(); + HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation(); + try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new Path(basePath, partitionPath)) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId()) + .overBaseCommit(location.getInstantTime()).withFs(fs).build()) { + Map header = new java.util.HashMap<>(); + header.put(HeaderMetadataType.INSTANT_TIME, location.getInstantTime()); + header.put(HeaderMetadataType.SCHEMA, schema.toString()); + logWriter.appendBlock(new HoodieAvroDataBlock(groupedRecords.stream().map(r -> { + try { + GenericRecord val = (GenericRecord) r.getData().getInsertValue(schema).get(); + HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(), r.getPartitionPath(), ""); + return (org.apache.avro.generic.IndexedRecord) val; + } catch (java.io.IOException e) { + LOG.warn("Failed to convert record " + r.toString(), e); + return null; + } + }).collect(Collectors.toList()), header)); + } + } +}