[HUDI-1184] Fix the support of hbase index partition path change (#1978)
When the hbase index is used, when the record partition is changed to another partition, the path does not change according to the value of the partition column Co-authored-by: huangjing <huangjing@clinbrain.com>
This commit is contained in:
@@ -102,6 +102,17 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
|
|||||||
public static final String HBASE_ZK_PATH_QPS_ROOT = "hoodie.index.hbase.zkpath.qps_root";
|
public static final String HBASE_ZK_PATH_QPS_ROOT = "hoodie.index.hbase.zkpath.qps_root";
|
||||||
public static final String DEFAULT_HBASE_ZK_PATH_QPS_ROOT = "/QPS_ROOT";
|
public static final String DEFAULT_HBASE_ZK_PATH_QPS_ROOT = "/QPS_ROOT";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Only applies if index type is Hbase.
|
||||||
|
* <p>
|
||||||
|
* When set to true, an update to a record with a different partition from its existing one
|
||||||
|
* will insert the record to the new partition and delete it from the old partition.
|
||||||
|
* <p>
|
||||||
|
* When set to false, a record will be updated to the old partition.
|
||||||
|
*/
|
||||||
|
public static final String HBASE_INDEX_UPDATE_PARTITION_PATH = "hoodie.hbase.index.update.partition.path";
|
||||||
|
public static final Boolean DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH = false;
|
||||||
|
|
||||||
public HoodieHBaseIndexConfig(final Properties props) {
|
public HoodieHBaseIndexConfig(final Properties props) {
|
||||||
super(props);
|
super(props);
|
||||||
}
|
}
|
||||||
@@ -196,6 +207,11 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder hbaseIndexUpdatePartitionPath(boolean updatePartitionPath) {
|
||||||
|
props.setProperty(HBASE_INDEX_UPDATE_PARTITION_PATH, String.valueOf(updatePartitionPath));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) {
|
public Builder withQPSResourceAllocatorType(String qpsResourceAllocatorClass) {
|
||||||
props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass);
|
props.setProperty(HBASE_INDEX_QPS_ALLOCATOR_CLASS, qpsResourceAllocatorClass);
|
||||||
return this;
|
return this;
|
||||||
@@ -259,6 +275,8 @@ public class HoodieHBaseIndexConfig extends DefaultHoodieConfig {
|
|||||||
HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_CONNECTION_TIMEOUT_MS));
|
HOODIE_INDEX_HBASE_ZK_CONNECTION_TIMEOUT_MS, String.valueOf(DEFAULT_ZK_CONNECTION_TIMEOUT_MS));
|
||||||
setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS), HBASE_INDEX_QPS_ALLOCATOR_CLASS,
|
setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_QPS_ALLOCATOR_CLASS), HBASE_INDEX_QPS_ALLOCATOR_CLASS,
|
||||||
String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
|
String.valueOf(DEFAULT_HBASE_INDEX_QPS_ALLOCATOR_CLASS));
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(HBASE_INDEX_UPDATE_PARTITION_PATH), HBASE_INDEX_UPDATE_PARTITION_PATH,
|
||||||
|
String.valueOf(DEFAULT_HBASE_INDEX_UPDATE_PARTITION_PATH));
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -485,6 +485,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP));
|
return Integer.parseInt(props.getProperty(HoodieHBaseIndexConfig.HBASE_MAX_QPS_PER_REGION_SERVER_PROP));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getHbaseIndexUpdatePartitionPath() {
|
||||||
|
return Boolean.parseBoolean(props.getProperty(HoodieHBaseIndexConfig.HBASE_INDEX_UPDATE_PARTITION_PATH));
|
||||||
|
}
|
||||||
|
|
||||||
public int getBloomIndexParallelism() {
|
public int getBloomIndexParallelism() {
|
||||||
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
|
return Integer.parseInt(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM_PROP));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import org.apache.hudi.client.WriteStatus;
|
|||||||
import org.apache.hudi.client.common.HoodieEngineContext;
|
import org.apache.hudi.client.common.HoodieEngineContext;
|
||||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||||
import org.apache.hudi.client.utils.SparkMemoryUtils;
|
import org.apache.hudi.client.utils.SparkMemoryUtils;
|
||||||
|
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
@@ -188,6 +189,7 @@ public class SparkHoodieHBaseIndex<T extends HoodieRecordPayload> extends SparkH
|
|||||||
hoodieRecordIterator) -> {
|
hoodieRecordIterator) -> {
|
||||||
|
|
||||||
int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
|
int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
|
||||||
|
boolean updatePartitionPath = config.getHbaseIndexUpdatePartitionPath();
|
||||||
|
|
||||||
// Grab the global HBase connection
|
// Grab the global HBase connection
|
||||||
synchronized (SparkHoodieHBaseIndex.class) {
|
synchronized (SparkHoodieHBaseIndex.class) {
|
||||||
@@ -205,35 +207,51 @@ public class SparkHoodieHBaseIndex<T extends HoodieRecordPayload> extends SparkH
|
|||||||
statements.add(generateStatement(rec.getRecordKey()));
|
statements.add(generateStatement(rec.getRecordKey()));
|
||||||
currentBatchOfRecords.add(rec);
|
currentBatchOfRecords.add(rec);
|
||||||
// iterator till we reach batch size
|
// iterator till we reach batch size
|
||||||
if (statements.size() >= multiGetBatchSize || !hoodieRecordIterator.hasNext()) {
|
if (hoodieRecordIterator.hasNext() && statements.size() < multiGetBatchSize) {
|
||||||
// get results for batch from Hbase
|
continue;
|
||||||
Result[] results = doGet(hTable, statements);
|
}
|
||||||
// clear statements to be GC'd
|
// get results for batch from Hbase
|
||||||
statements.clear();
|
Result[] results = doGet(hTable, statements);
|
||||||
for (Result result : results) {
|
// clear statements to be GC'd
|
||||||
// first, attempt to grab location from HBase
|
statements.clear();
|
||||||
HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
|
for (Result result : results) {
|
||||||
if (result.getRow() != null) {
|
// first, attempt to grab location from HBase
|
||||||
String keyFromResult = Bytes.toString(result.getRow());
|
HoodieRecord currentRecord = currentBatchOfRecords.remove(0);
|
||||||
String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
|
if (result.getRow() == null) {
|
||||||
String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
|
taggedRecords.add(currentRecord);
|
||||||
String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
|
continue;
|
||||||
|
}
|
||||||
if (checkIfValidCommit(metaClient, commitTs)) {
|
String keyFromResult = Bytes.toString(result.getRow());
|
||||||
currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
|
String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
|
||||||
currentRecord.getData());
|
String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
|
||||||
currentRecord.unseal();
|
String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
|
||||||
currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
|
if (!checkIfValidCommit(metaClient, commitTs)) {
|
||||||
currentRecord.seal();
|
// if commit is invalid, treat this as a new taggedRecord
|
||||||
taggedRecords.add(currentRecord);
|
taggedRecords.add(currentRecord);
|
||||||
// the key from Result and the key being processed should be same
|
continue;
|
||||||
assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
|
}
|
||||||
} else { // if commit is invalid, treat this as a new taggedRecord
|
// check whether to do partition change processing
|
||||||
taggedRecords.add(currentRecord);
|
if (updatePartitionPath && !partitionPath.equals(currentRecord.getPartitionPath())) {
|
||||||
}
|
// delete partition old data record
|
||||||
} else {
|
HoodieRecord emptyRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
|
||||||
taggedRecords.add(currentRecord);
|
new EmptyHoodieRecordPayload());
|
||||||
}
|
emptyRecord.unseal();
|
||||||
|
emptyRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
|
||||||
|
emptyRecord.seal();
|
||||||
|
// insert partition new data record
|
||||||
|
currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), currentRecord.getPartitionPath()),
|
||||||
|
currentRecord.getData());
|
||||||
|
taggedRecords.add(emptyRecord);
|
||||||
|
taggedRecords.add(currentRecord);
|
||||||
|
} else {
|
||||||
|
currentRecord = new HoodieRecord(new HoodieKey(currentRecord.getRecordKey(), partitionPath),
|
||||||
|
currentRecord.getData());
|
||||||
|
currentRecord.unseal();
|
||||||
|
currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
|
||||||
|
currentRecord.seal();
|
||||||
|
taggedRecords.add(currentRecord);
|
||||||
|
// the key from Result and the key being processed should be same
|
||||||
|
assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ package org.apache.hudi.index.hbase;
|
|||||||
|
|
||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
|
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieTableType;
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
@@ -58,6 +60,7 @@ import org.junit.jupiter.api.Test;
|
|||||||
import org.junit.jupiter.api.TestMethodOrder;
|
import org.junit.jupiter.api.TestMethodOrder;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -168,6 +171,59 @@ public class TestHBaseIndex extends FunctionalTestHarness {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTagLocationAndPartitionPathUpdate() throws Exception {
|
||||||
|
final String newCommitTime = "001";
|
||||||
|
final int numRecords = 10;
|
||||||
|
final String oldPartitionPath = "1970/01/01";
|
||||||
|
final String emptyHoodieRecordPayloadClasssName = EmptyHoodieRecordPayload.class.getName();
|
||||||
|
|
||||||
|
List<HoodieRecord> newRecords = dataGen.generateInserts(newCommitTime, numRecords);
|
||||||
|
List<HoodieRecord> oldRecords = new LinkedList();
|
||||||
|
for (HoodieRecord newRecord: newRecords) {
|
||||||
|
HoodieKey key = new HoodieKey(newRecord.getRecordKey(), oldPartitionPath);
|
||||||
|
HoodieRecord hoodieRecord = new HoodieRecord(key, newRecord.getData());
|
||||||
|
oldRecords.add(hoodieRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
JavaRDD<HoodieRecord> newWriteRecords = jsc().parallelize(newRecords, 1);
|
||||||
|
JavaRDD<HoodieRecord> oldWriteRecords = jsc().parallelize(oldRecords, 1);
|
||||||
|
|
||||||
|
HoodieWriteConfig config = getConfig(true);
|
||||||
|
SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(getConfig(true));
|
||||||
|
|
||||||
|
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
|
||||||
|
// allowed path change test
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
HoodieTable hoodieTable = HoodieSparkTable.create(config, context, metaClient);
|
||||||
|
|
||||||
|
JavaRDD<HoodieRecord> oldHoodieRecord = index.tagLocation(oldWriteRecords, context, hoodieTable);
|
||||||
|
assertEquals(0, oldHoodieRecord.filter(record -> record.isCurrentLocationKnown()).count());
|
||||||
|
writeClient.startCommitWithTime(newCommitTime);
|
||||||
|
JavaRDD<WriteStatus> writeStatues = writeClient.upsert(oldWriteRecords, newCommitTime);
|
||||||
|
writeClient.commit(newCommitTime, writeStatues);
|
||||||
|
assertNoWriteErrors(writeStatues.collect());
|
||||||
|
index.updateLocation(writeStatues, context, hoodieTable);
|
||||||
|
|
||||||
|
metaClient = HoodieTableMetaClient.reload(metaClient);
|
||||||
|
hoodieTable = HoodieSparkTable.create(config, context, metaClient);
|
||||||
|
List<HoodieRecord> taggedRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect();
|
||||||
|
assertEquals(numRecords * 2L, taggedRecords.stream().count());
|
||||||
|
// Verify the number of deleted records
|
||||||
|
assertEquals(numRecords, taggedRecords.stream().filter(record -> record.getKey().getPartitionPath().equals(oldPartitionPath)
|
||||||
|
&& record.getData().getClass().getName().equals(emptyHoodieRecordPayloadClasssName)).count());
|
||||||
|
// Verify the number of inserted records
|
||||||
|
assertEquals(numRecords, taggedRecords.stream().filter(record -> !record.getKey().getPartitionPath().equals(oldPartitionPath)).count());
|
||||||
|
|
||||||
|
// not allowed path change test
|
||||||
|
index = new SparkHoodieHBaseIndex<>(getConfig(false));
|
||||||
|
List<HoodieRecord> notAllowPathChangeRecords = index.tagLocation(newWriteRecords, context, hoodieTable).collect();
|
||||||
|
assertEquals(numRecords, notAllowPathChangeRecords.stream().count());
|
||||||
|
assertEquals(numRecords, taggedRecords.stream().filter(hoodieRecord -> hoodieRecord.isCurrentLocationKnown()
|
||||||
|
&& hoodieRecord.getKey().getPartitionPath().equals(oldPartitionPath)).count());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTagLocationAndDuplicateUpdate() throws Exception {
|
public void testTagLocationAndDuplicateUpdate() throws Exception {
|
||||||
final String newCommitTime = "001";
|
final String newCommitTime = "001";
|
||||||
@@ -454,14 +510,18 @@ public class TestHBaseIndex extends FunctionalTestHarness {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private HoodieWriteConfig getConfig() {
|
private HoodieWriteConfig getConfig() {
|
||||||
return getConfigBuilder(100).build();
|
return getConfigBuilder(100, false).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) {
|
private HoodieWriteConfig getConfig(int hbaseIndexBatchSize) {
|
||||||
return getConfigBuilder(hbaseIndexBatchSize).build();
|
return getConfigBuilder(hbaseIndexBatchSize, false).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize) {
|
private HoodieWriteConfig getConfig(boolean updatePartitionPath) {
|
||||||
|
return getConfigBuilder(100, updatePartitionPath).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
private HoodieWriteConfig.Builder getConfigBuilder(int hbaseIndexBatchSize, boolean updatePartitionPath) {
|
||||||
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
return HoodieWriteConfig.newBuilder().withPath(basePath()).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
|
||||||
.withParallelism(1, 1).withDeleteParallelism(1)
|
.withParallelism(1, 1).withDeleteParallelism(1)
|
||||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
|
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024)
|
||||||
@@ -475,6 +535,7 @@ public class TestHBaseIndex extends FunctionalTestHarness {
|
|||||||
.hbaseIndexPutBatchSizeAutoCompute(true)
|
.hbaseIndexPutBatchSizeAutoCompute(true)
|
||||||
.hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
|
.hbaseZkZnodeParent(hbaseConfig.get("zookeeper.znode.parent", ""))
|
||||||
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME)
|
.hbaseZkQuorum(hbaseConfig.get("hbase.zookeeper.quorum")).hbaseTableName(TABLE_NAME)
|
||||||
|
.hbaseIndexUpdatePartitionPath(updatePartitionPath)
|
||||||
.hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
|
.hbaseIndexGetBatchSize(hbaseIndexBatchSize).build())
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user