[HUDI-499] Allow update partition path with GLOBAL_BLOOM (#1187)
* Handle partition path update by deleting a record from the old partition and insert into the new one * Add a new configuration "hoodie.bloom.index.update.partition.path" to enable the behavior * Add a new unit test case for global bloom index
This commit is contained in:
@@ -77,6 +77,17 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
||||
public static final String BLOOM_INDEX_INPUT_STORAGE_LEVEL = "hoodie.bloom.index.input.storage.level";
|
||||
public static final String DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
|
||||
|
||||
/**
|
||||
* Only applies if index type is GLOBAL_BLOOM.
|
||||
* <p>
|
||||
* When set to true, an update to a record with a different partition from its existing one
|
||||
* will insert the record to the new partition and delete it from the old partition.
|
||||
* <p>
|
||||
* When set to false, a record will be updated to the old partition.
|
||||
*/
|
||||
public static final String BLOOM_INDEX_UPDATE_PARTITION_PATH = "hoodie.bloom.index.update.partition.path";
|
||||
public static final String DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH = "false";
|
||||
|
||||
private HoodieIndexConfig(Properties props) {
|
||||
super(props);
|
||||
}
|
||||
@@ -176,6 +187,11 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withBloomIndexUpdatePartitionPath(boolean updatePartitionPath) {
|
||||
props.setProperty(BLOOM_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath));
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodieIndexConfig build() {
|
||||
HoodieIndexConfig config = new HoodieIndexConfig(props);
|
||||
setDefaultOnCondition(props, !props.containsKey(INDEX_TYPE_PROP), INDEX_TYPE_PROP, DEFAULT_INDEX_TYPE);
|
||||
@@ -190,6 +206,8 @@ public class HoodieIndexConfig extends DefaultHoodieConfig {
|
||||
DEFAULT_BLOOM_INDEX_USE_CACHING);
|
||||
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_INPUT_STORAGE_LEVEL), BLOOM_INDEX_INPUT_STORAGE_LEVEL,
|
||||
DEFAULT_BLOOM_INDEX_INPUT_STORAGE_LEVEL);
|
||||
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_UPDATE_PARTITION_PATH),
|
||||
BLOOM_INDEX_UPDATE_PARTITION_PATH, DEFAULT_BLOOM_INDEX_UPDATE_PARTITION_PATH);
|
||||
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_TREE_BASED_FILTER_PROP),
|
||||
BLOOM_INDEX_TREE_BASED_FILTER_PROP, DEFAULT_BLOOM_INDEX_TREE_BASED_FILTER);
|
||||
setDefaultOnCondition(props, !props.containsKey(BLOOM_INDEX_BUCKETIZED_CHECKING_PROP),
|
||||
|
||||
@@ -431,6 +431,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return StorageLevel.fromString(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL));
|
||||
}
|
||||
|
||||
public boolean getBloomIndexUpdatePartitionPath() {
|
||||
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH));
|
||||
}
|
||||
|
||||
/**
|
||||
* storage properties.
|
||||
*/
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hudi.index.bloom;
|
||||
|
||||
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
@@ -36,6 +37,8 @@ import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.Optional;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -114,14 +117,28 @@ public class HoodieGlobalBloomIndex<T extends HoodieRecordPayload> extends Hoodi
|
||||
keyLocationPairRDD.mapToPair(p -> new Tuple2<>(p._1.getRecordKey(), new Tuple2<>(p._2, p._1)));
|
||||
|
||||
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null), so we do left outer join.
|
||||
return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().map(record -> {
|
||||
return incomingRowKeyRecordPairRDD.leftOuterJoin(existingRecordKeyToRecordLocationHoodieKeyMap).values().flatMap(record -> {
|
||||
final HoodieRecord<T> hoodieRecord = record._1;
|
||||
final Optional<Tuple2<HoodieRecordLocation, HoodieKey>> recordLocationHoodieKeyPair = record._2;
|
||||
if (recordLocationHoodieKeyPair.isPresent()) {
|
||||
// Record key matched to file
|
||||
return getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()), Option.ofNullable(recordLocationHoodieKeyPair.get()._1));
|
||||
if (config.getBloomIndexUpdatePartitionPath()
|
||||
&& !recordLocationHoodieKeyPair.get()._2.getPartitionPath().equals(hoodieRecord.getPartitionPath())) {
|
||||
// Create an empty record to delete the record in the old partition
|
||||
HoodieRecord<T> emptyRecord = new HoodieRecord(recordLocationHoodieKeyPair.get()._2,
|
||||
new EmptyHoodieRecordPayload());
|
||||
// Tag the incoming record for inserting to the new partition
|
||||
HoodieRecord<T> taggedRecord = getTaggedRecord(hoodieRecord, Option.empty());
|
||||
return Arrays.asList(emptyRecord, taggedRecord).iterator();
|
||||
} else {
|
||||
// Ignore the incoming record's partition, regardless of whether it differs from its old partition or not.
|
||||
// When it differs, the record will still be updated at its old partition.
|
||||
return Collections.singletonList(
|
||||
getTaggedRecord(new HoodieRecord<>(recordLocationHoodieKeyPair.get()._2, hoodieRecord.getData()),
|
||||
Option.ofNullable(recordLocationHoodieKeyPair.get()._1))).iterator();
|
||||
}
|
||||
} else {
|
||||
return getTaggedRecord(hoodieRecord, Option.empty());
|
||||
return Collections.singletonList(getTaggedRecord(hoodieRecord, Option.empty())).iterator();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user