diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java index 709f08c31..5d79776b1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieHBaseIndexConfig.java @@ -102,6 +102,17 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { public static final String HBASE_ZK_PATH_QPS_ROOT = "hoodie.index.hbase.zkpath.qps_root"; public static final String DEFAULT_HBASE_ZK_PATH_QPS_ROOT = "/QPS_ROOT"; + /** + * Only applies if index type is Hbase. + *

+ * When set to true, an update to a record with a different partition from its existing one + * will insert the record to the new partition and delete it from the old partition. + *

+ * When set to false, a record will be updated to the old partition. + */ + public static final String HBASE_INDEX_UPDATE_PARTITION_PATH = "hoodie.hbase.index.update.partition.path"; + public static final Boolean DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH = false; + public HoodieHBaseIndexConfig(final Properties props) { super(props); } @@ -196,6 +207,11 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { return this; } + public Builder hbaseIndexUpdatePartitionPath(boolean updatePartitionPath) { + props.setProperty(HBASE_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath)); + return this; + } + public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) { props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass); return this; @@ -259,6 +275,8 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig { HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_CONNECTION_TIMEOUT_MS)); setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS), HBASE_INDEX_QPS_ALLOCATOR_CLASS, String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS)); + setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_UPDATE_PARTITION_PATH), HBASE_INDEX_UPDATE_PARTITION_PATH, + String.valueOf(DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH)); return config; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index bb6560057..42d3e2b40 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -485,6 +485,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP)); } + public boolean getHbaseIndexUpdatePartitionPath() { + return Boolean.parseBoolean(props.getProperty(HoodieHBaseIndexConfig.HBASE_INDEX_UPDATE_PARTITION_PATH)); + } + public int getBloomIndexParallelism() { return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java index 21efd9b27..072c71cc8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/SparkHoodieHBaseIndex.java @@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.SparkMemoryUtils; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -188,6 +189,7 @@ public class SparkHoodieHBaseIndex extends SparkH hoodieRecordIterator) -> { int multiGetBatchSize = config.getHbaseIndexGetBatchSize(); + boolean updatePartitionPath = config.getHbaseIndexUpdatePartitionPath(); // Grab the global HBase connection synchronized (SparkHoodieHBaseIndex.class) { @@ -205,35 +207,51 @@ public class SparkHoodieHBaseIndex extends SparkH statements.add(generateStatement(rec.getRecordKey())); currentBatchOfRecords.add(rec); // iterator till we reach batch size - if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) { - // get results for batch from Hbase - Result[] results = doGet(hTable, statements); - // clear statements to be GC'd - statements.clear(); - for (Result result : results) { - // first, attempt to grab location from HBase - HoodieRecord currentRecord = currentBatchOfRecords.remove(0); - if (result.getRow() != null) { - String keyFromResult = Bytes.toString(result.getRow()); - String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)); - String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)); - String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); - - if (checkIfValidCommit(metaClient, commitTs)) { - currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath), - currentRecord.getData()); - currentRecord.unseal(); - currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId)); - currentRecord.seal(); - taggedRecords.add(currentRecord); - // the key from Result and the key being processed should be same - assert (currentRecord.getRecordKey().contentEquals(keyFromResult)); - } else { // if commit is invalid, treat this as a new taggedRecord - taggedRecords.add(currentRecord); - } - } else { - taggedRecords.add(currentRecord); - } + if (hoodieRecordIterator.hasNext() && statements.size() < multiGetBatchSize) { + continue; + } + // get results for batch from Hbase + Result[] results = doGet(hTable, statements); + // clear statements to be GC'd + statements.clear(); + for (Result result : results) { + // first, attempt to grab location from HBase + HoodieRecord currentRecord = currentBatchOfRecords.remove(0); + if (result.getRow() == null) { + taggedRecords.add(currentRecord); + continue; + } + String keyFromResult = Bytes.toString(result.getRow()); + String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)); + String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN)); + String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN)); + if (!checkIfValidCommit(metaClient, commitTs)) { + // if commit is invalid, treat this as a new taggedRecord + taggedRecords.add(currentRecord); + continue; + } + // check whether to do partition change processing + if (updatePartitionPath && !partitionPath.equals(currentRecord.getPartitionPath())) { + // delete partition old data record + HoodieRecord emptyRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath), + new EmptyHoodieRecordPayload()); + emptyRecord.unseal(); + emptyRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId)); + emptyRecord.seal(); + // insert partition new data record + currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), currentRecord.getPartitionPath()), + currentRecord.getData()); + taggedRecords.add(emptyRecord); + taggedRecords.add(currentRecord); + } else { + currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath), + currentRecord.getData()); + currentRecord.unseal(); + currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId)); + currentRecord.seal(); + taggedRecords.add(currentRecord); + // the key from Result and the key being processed should be same + assert (currentRecord.getRecordKey().contentEquals(keyFromResult)); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java index b85851663..b74daad40 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestHBaseIndex.java @@ -20,6 +20,8 @@ package org.apache.hudi.index.hbase; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.EmptyHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; @@ -58,6 +60,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; import java.util.Arrays; +import java.util.LinkedList; import java.util.List; import java.util.stream.Collectors; @@ -168,6 +171,59 @@ public class TestHBaseIndex extends FunctionalTestHarness { } } + @Test + public void testTagLocationAndPartitionPathUpdate() throws Exception { + final String newCommitTime = "001"; + final int numRecords = 10; + final String oldPartitionPath = "1970/01/01"; + final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName(); + + List newRecords = dataGen.generateInserts(newCommitTime, numRecords); + List oldRecords = new LinkedList(); + for (HoodieRecord newRecord: newRecords) { + HoodieKey key = new HoodieKey(newRecord.getRecordKey(), oldPartitionPath); + HoodieRecord hoodieRecord = new HoodieRecord(key, newRecord.getData()); + oldRecords.add(hoodieRecord); + } + + JavaRDD newWriteRecords = jsc().parallelize(newRecords, 1); + JavaRDD oldWriteRecords = jsc().parallelize(oldRecords, 1); + + HoodieWriteConfig config = getConfig(true); + SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true)); + + try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { + // allowed path change test + metaClient = HoodieTableMetaClient.reload(metaClient); + HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient); + + JavaRDD oldHoodieRecord = index.tagLocation(oldWriteRecords, context, hoodieTable); + assertEquals(0, oldHoodieRecord.filter(record -> record.isCurrentLocationKnown()).count()); + writeClient.startCommitWithTime(newCommitTime); + JavaRDD writeStatues = writeClient.upsert(oldWriteRecords, newCommitTime); + writeClient.commit(newCommitTime, writeStatues); + assertNoWriteErrors(writeStatues.collect()); + index.updateLocation(writeStatues, context, hoodieTable); + + metaClient = HoodieTableMetaClient.reload(metaClient); + hoodieTable = HoodieSparkTable.create(config, context, metaClient); + List taggedRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect(); + assertEquals(numRecords * 2L, taggedRecords.stream().count()); + // Verify the number of deleted records + assertEquals(numRecords, taggedRecords.stream().filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath) + && record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClasssName)).count()); + // Verify the number of inserted records + assertEquals(numRecords, taggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count()); + + // not allowed path change test + index = new SparkHoodieHBaseIndex<>(getConfig(false)); + List notAllowPathChangeRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect(); + assertEquals(numRecords, notAllowPathChangeRecords.stream().count()); + assertEquals(numRecords, taggedRecords.stream().filter(hoodieRecord -> hoodieRecord.isCurrentLocationKnown() + && hoodieRecord.getKey().getPartitionPath().equals(oldPartitionPath)).count()); + } + } + @Test public void testTagLocationAndDuplicateUpdate() throws Exception { final String newCommitTime = "001"; @@ -454,14 +510,18 @@ public class TestHBaseIndex extends FunctionalTestHarness { } private HoodieWriteConfig getConfig() { - return getConfigBuilder(100).build(); + return getConfigBuilder(100, false).build(); } private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) { - return getConfigBuilder(hbaseIndexBatchSize).build(); + return getConfigBuilder(hbaseIndexBatchSize, false).build(); } - private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) { + private HoodieWriteConfig getConfig(boolean updatePartitionPath) { + return getConfigBuilder(100, updatePartitionPath).build(); + } + + private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath) { return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) .withParallelism(1, 1).withDeleteParallelism(1) .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024) @@ -475,6 +535,7 @@ public class TestHBaseIndex extends FunctionalTestHarness { .hbaseIndexPutBatchSizeAutoCompute(true) .hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", "")) .hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME) + .hbaseIndexUpdatePartitionPath(updatePartitionPath) .hbaseIndexGetBatchSize(hbaseIndexBatchSize).build()) .build()); }