[HUDI-4210] Create custom hbase index to solve data skew issue on hbase regions (#5797)
This commit is contained in:
@@ -183,6 +183,10 @@ public class HoodieHBaseIndexConfig extends HoodieConfig {
|
||||
.noDefaultValue()
|
||||
.withDocumentation("The value of hbase.master.kerberos.principal in hbase cluster.");
|
||||
|
||||
public static final ConfigProperty<Integer> BUCKET_NUMBER = ConfigProperty
|
||||
.key("hoodie.index.hbase.bucket.number")
|
||||
.defaultValue(8)
|
||||
.withDocumentation("Only applicable when using RebalancedSparkHoodieHBaseIndex, same as hbase regions count can get the best performance");
|
||||
|
||||
/**
|
||||
* @deprecated Use {@link #ZKQUORUM} and its methods instead
|
||||
|
||||
@@ -1553,6 +1553,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
||||
return getBooleanOrDefault(HoodieHBaseIndexConfig.UPDATE_PARTITION_PATH_ENABLE);
|
||||
}
|
||||
|
||||
public int getHBaseIndexRegionCount() {
|
||||
return getInt(HoodieHBaseIndexConfig.BUCKET_NUMBER);
|
||||
}
|
||||
|
||||
public int getBloomIndexParallelism() {
|
||||
return getInt(HoodieIndexConfig.BLOOM_INDEX_PARALLELISM);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.index.hbase;
|
||||
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
/**
|
||||
* Extends {@link SparkHoodieHBaseIndex}, add random prefix to key for avoiding data skew issue in hbase regions.
|
||||
*/
|
||||
public class RebalancedSparkHoodieHBaseIndex extends SparkHoodieHBaseIndex {
|
||||
|
||||
public RebalancedSparkHoodieHBaseIndex(HoodieWriteConfig config) {
|
||||
super(config);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getHBaseKey(String originalKey) {
|
||||
int bucket = Math.abs(originalKey.hashCode()) % config.getHBaseIndexRegionCount();
|
||||
String bucketStr = String.format("%0" + String.valueOf(config.getHBaseIndexRegionCount() - 1).length() + "d", bucket);
|
||||
return bucketStr + originalKey;
|
||||
}
|
||||
}
|
||||
@@ -205,7 +205,7 @@ public class SparkHoodieHBaseIndex extends HoodieIndex<Object, Object> {
|
||||
}
|
||||
|
||||
private Get generateStatement(String key) throws IOException {
|
||||
return new Get(Bytes.toBytes(key)).setMaxVersions(1).addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
|
||||
return new Get(Bytes.toBytes(getHBaseKey(key))).setMaxVersions(1).addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN)
|
||||
.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN);
|
||||
}
|
||||
|
||||
@@ -213,6 +213,10 @@ public class SparkHoodieHBaseIndex extends HoodieIndex<Object, Object> {
|
||||
return generateStatement(key).setTimeRange(startTime, endTime);
|
||||
}
|
||||
|
||||
protected String getHBaseKey(String key) {
|
||||
return key;
|
||||
}
|
||||
|
||||
private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) {
|
||||
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
|
||||
// Check if the last commit ts for this row is 1) present in the timeline or
|
||||
@@ -354,14 +358,14 @@ public class SparkHoodieHBaseIndex extends HoodieIndex<Object, Object> {
|
||||
// This is an update, no need to update index
|
||||
continue;
|
||||
}
|
||||
Put put = new Put(Bytes.toBytes(rec.getRecordKey()));
|
||||
Put put = new Put(Bytes.toBytes(getHBaseKey(rec.getRecordKey())));
|
||||
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, Bytes.toBytes(loc.get().getInstantTime()));
|
||||
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, Bytes.toBytes(loc.get().getFileId()));
|
||||
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, Bytes.toBytes(rec.getPartitionPath()));
|
||||
mutations.add(put);
|
||||
} else {
|
||||
// Delete existing index for a deleted record
|
||||
Delete delete = new Delete(Bytes.toBytes(rec.getRecordKey()));
|
||||
Delete delete = new Delete(Bytes.toBytes(getHBaseKey(rec.getRecordKey())));
|
||||
mutations.add(delete);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user