diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index d39fae1e1..db834980a 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -77,6 +77,17 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage.level"; public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER"; + /** + * Only applies if index type is GLOBAL_BLOOM. + *

+ * When set to true, an update to a record with a different partition from its existing one + * will insert the record to the new partition and delete it from the old partition. + *

+ * When set to false, a record will be updated to the old partition. + */ + public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH = "hoodie.bloom.index.update.partition.path"; + public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH = "false"; + private HoodieIndexConfig(Properties props) { super(props); } @@ -176,6 +187,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { return this; } + public Builder withBloomIndexUpdatePartitionPath(boolean updatePartitionPath) { + props.setProperty(BLOOM_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath)); + return this; + } + public HoodieIndexConfig build() { HoodieIndexConfig config = new HoodieIndexConfig(props); setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE); @@ -190,6 +206,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig { DEFAULT_BLOOM_INDEX_USE_CACHING); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL), BLOOM_INDEX_INPUT_STORAGE_LEVEL, DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL); + setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_UPDATE_PARTITION_PATH), + BLOOM_INDEX_UPDATE_PARTITION_PATH, DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_TREE_BASED_FILTER_PROP), BLOOM_INDEX_TREE_BASED_FILTER_PROP, DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER); setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP), diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 7fc0680d6..642384b75 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -431,6 +431,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return StorageLevel.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL)); } + public boolean getBloomIndexUpdatePartitionPath() { + return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH)); + } + /** * storage properties. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java index be6f52474..ba8976b9c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java +++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java @@ -18,6 +18,7 @@ package org.apache.hudi.index.bloom; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -36,6 +37,8 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -114,14 +117,28 @@ public class HoodieGlobalBloomIndex extends Hoodi keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1))); // Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join. - return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record -> { + return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record -> { final HoodieRecord hoodieRecord = record._1; final Optional> recordLocationHoodieKeyPair = record._2; if (recordLocationHoodieKeyPair.isPresent()) { // Record key matched to file - return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1)); + if (config.getBloomIndexUpdatePartitionPath() + && !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())) { + // Create an empty record to delete the record in the old partition + HoodieRecord emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2, + new EmptyHoodieRecordPayload()); + // Tag the incoming record for inserting to the new partition + HoodieRecord taggedRecord = getTaggedRecord(hoodieRecord, Option.empty()); + return Arrays.asList(emptyRecord, taggedRecord).iterator(); + } else { + // Ignore the incoming record's partition, regardless of whether it differs from its old partition or not. + // When it differs, the record will still be updated at its old partition. + return Collections.singletonList( + getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), + Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator(); + } } else { - return getTaggedRecord(hoodieRecord, Option.empty()); + return Collections.singletonList(getTaggedRecord(hoodieRecord, Option.empty())).iterator(); } }); } diff --git a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java index c605654b7..5e4e21bb3 100644 --- a/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java +++ b/hudi-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java @@ -21,6 +21,7 @@ package org.apache.hudi.index.bloom; import org.apache.hudi.HoodieClientTestHarness; import org.apache.hudi.common.HoodieClientTestUtils; import org.apache.hudi.common.TestRawTripPayload; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; @@ -28,6 +29,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.HoodieAvroUtils; +import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -56,6 +58,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { @@ -297,6 +300,92 @@ public class TestHoodieGlobalBloomIndex extends HoodieClientTestHarness { } } + @Test + public void testTagLocationWhenShouldUpdatePartitionPath() throws Exception { + HoodieWriteConfig config = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .withIndexConfig(HoodieIndexConfig.newBuilder().withBloomIndexUpdatePartitionPath(true).build()) + .build(); + HoodieGlobalBloomIndex index = new HoodieGlobalBloomIndex(config); + + // Create the original partition, and put a record, along with the meta file + // "2016/01/31": 1 file (1_0_20160131101010.parquet) + new File(basePath + "/2016/01/31").mkdirs(); + new File(basePath + "/2016/01/31/" + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE).createNewFile(); + + // this record will be saved in table and will be tagged to an empty record + TestRawTripPayload originalPayload = + new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord originalRecord = + new HoodieRecord(new HoodieKey(originalPayload.getRowKey(), originalPayload.getPartitionPath()), + originalPayload); + + /* + This record has the same record key as originalRecord but different time so different partition + Because GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true, + globalBloomIndex should + - tag the original partition of the originalRecord to an empty record for deletion, and + - tag the new partition of the incomingRecord + */ + TestRawTripPayload incomingPayload = + new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-02-31T03:16:41.415Z\",\"number\":12}"); + HoodieRecord incomingRecord = + new HoodieRecord(new HoodieKey(incomingPayload.getRowKey(), incomingPayload.getPartitionPath()), + incomingPayload); + + /* + This record has the same record key as originalRecord and the same partition + Though GLOBAL_BLOOM_INDEX_SHOULD_UPDATE_PARTITION_PATH = true, + globalBloomIndex should just tag the original partition + */ + TestRawTripPayload incomingPayloadSamePartition = + new TestRawTripPayload("{\"_row_key\":\"000\",\"time\":\"2016-01-31T04:16:41.415Z\",\"number\":15}"); + HoodieRecord incomingRecordSamePartition = + new HoodieRecord( + new HoodieKey(incomingPayloadSamePartition.getRowKey(), incomingPayloadSamePartition.getPartitionPath()), + incomingPayloadSamePartition); + + HoodieClientTestUtils + .writeParquetFile(basePath, "2016/01/31", Collections.singletonList(originalRecord), schema, null, false); + + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); + + // Add some commits + new File(basePath + "/.hoodie").mkdirs(); + + // test against incoming record with a different partition + JavaRDD recordRDD = jsc.parallelize(Collections.singletonList(incomingRecord)); + JavaRDD taggedRecordRDD = index.tagLocation(recordRDD, jsc, table); + + assertEquals(2, taggedRecordRDD.count()); + for (HoodieRecord record : taggedRecordRDD.collect()) { + switch (record.getPartitionPath()) { + case "2016/01/31": + assertEquals("000", record.getRecordKey()); + assertTrue(record.getData() instanceof EmptyHoodieRecordPayload); + break; + case "2016/02/31": + assertEquals("000", record.getRecordKey()); + assertEquals(incomingPayload.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData()); + break; + default: + fail(String.format("Should not get partition path: %s", record.getPartitionPath())); + } + } + + // test against incoming record with the same partition + JavaRDD recordRDDSamePartition = jsc + .parallelize(Collections.singletonList(incomingRecordSamePartition)); + JavaRDD taggedRecordRDDSamePartition = index.tagLocation(recordRDDSamePartition, jsc, table); + + assertEquals(1, taggedRecordRDDSamePartition.count()); + HoodieRecord record = taggedRecordRDDSamePartition.first(); + assertEquals("000", record.getRecordKey()); + assertEquals("2016/01/31", record.getPartitionPath()); + assertEquals(incomingPayloadSamePartition.getJsonData(), ((TestRawTripPayload) record.getData()).getJsonData()); + } + // convert list to map to avoid sorting order dependencies private Map toFileMap(List> filesList) { Map filesMap = new HashMap<>();