[HUDI-1347] Fix Hbase index to make rollback synchronous (via config) (#2188)
Co-authored-by: huangjing <huangjing@clinbrain.com> Co-authored-by: Sivabalan Narayanan <sivabala@uber.com>
This commit is contained in:
@@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hudi.index.hbase;
|
||||
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hudi.client.WriteStatus;
|
||||
import org.apache.hudi.client.common.HoodieSparkEngineContext;
|
||||
import org.apache.hudi.client.utils.SparkMemoryUtils;
|
||||
@@ -28,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.RateLimiter;
|
||||
@@ -67,6 +70,7 @@ import org.joda.time.DateTime;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
@@ -181,6 +185,10 @@ public class SparkHoodieHBaseIndex<T extends HoodieRecordPayload> extends SparkH
|
||||
.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN).addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN);
|
||||
}
|
||||
|
||||
private Get generateStatement(String key, long startTime, long endTime) throws IOException {
|
||||
return generateStatement(key).setTimeRange(startTime, endTime);
|
||||
}
|
||||
|
||||
private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) {
|
||||
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
|
||||
// Check if the last commit ts for this row is 1) present in the timeline or
|
||||
@@ -537,7 +545,72 @@ public class SparkHoodieHBaseIndex<T extends HoodieRecordPayload> extends SparkH
|
||||
|
||||
@Override
|
||||
public boolean rollbackCommit(String instantTime) {
|
||||
// Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()}
|
||||
int multiGetBatchSize = config.getHbaseIndexGetBatchSize();
|
||||
boolean rollbackSync = config.getHBaseIndexRollbackSync();
|
||||
|
||||
if (!config.getHBaseIndexRollbackSync()) {
|
||||
// Default Rollback in HbaseIndex is managed via method {@link #checkIfValidCommit()}
|
||||
return true;
|
||||
}
|
||||
|
||||
synchronized (SparkHoodieHBaseIndex.class) {
|
||||
if (hbaseConnection == null || hbaseConnection.isClosed()) {
|
||||
hbaseConnection = getHBaseConnection();
|
||||
}
|
||||
}
|
||||
try (HTable hTable = (HTable) hbaseConnection.getTable(TableName.valueOf(tableName));
|
||||
BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
|
||||
final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS);
|
||||
|
||||
Long rollbackTime = HoodieActiveTimeline.COMMIT_FORMATTER.parse(instantTime).getTime();
|
||||
Long currentTime = new Date().getTime();
|
||||
Scan scan = new Scan();
|
||||
scan.addFamily(SYSTEM_COLUMN_FAMILY);
|
||||
scan.setTimeRange(rollbackTime, currentTime);
|
||||
ResultScanner scanner = hTable.getScanner(scan);
|
||||
Iterator<Result> scannerIterator = scanner.iterator();
|
||||
|
||||
List<Get> statements = new ArrayList<>();
|
||||
List<Result> currentVersionResults = new ArrayList<Result>();
|
||||
List<Mutation> mutations = new ArrayList<>();
|
||||
while (scannerIterator.hasNext()) {
|
||||
Result result = scannerIterator.next();
|
||||
currentVersionResults.add(result);
|
||||
statements.add(generateStatement(Bytes.toString(result.getRow()), 0L, rollbackTime - 1));
|
||||
|
||||
if (scannerIterator.hasNext() && statements.size() < multiGetBatchSize) {
|
||||
continue;
|
||||
}
|
||||
Result[] lastVersionResults = hTable.get(statements);
|
||||
for (int i = 0; i < lastVersionResults.length; i++) {
|
||||
Result lastVersionResult = lastVersionResults[i];
|
||||
if (null == lastVersionResult.getRow() && rollbackSync) {
|
||||
Result currentVersionResult = currentVersionResults.get(i);
|
||||
Delete delete = new Delete(currentVersionResult.getRow());
|
||||
mutations.add(delete);
|
||||
}
|
||||
|
||||
if (null != lastVersionResult.getRow()) {
|
||||
String oldPath = new String(lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
|
||||
String nowPath = new String(currentVersionResults.get(i).getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
|
||||
if (!oldPath.equals(nowPath) || rollbackSync) {
|
||||
Put put = new Put(lastVersionResult.getRow());
|
||||
put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
|
||||
put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
|
||||
put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN, lastVersionResult.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
|
||||
mutations.add(put);
|
||||
}
|
||||
}
|
||||
}
|
||||
doMutations(mutator, mutations, limiter);
|
||||
currentVersionResults.clear();
|
||||
statements.clear();
|
||||
mutations.clear();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("hbase index roll back failed", e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user