1
0

[HUDI-2917] rollback insert data appended to log file when using Hbase Index (#4840)

Co-authored-by: guanziyue <guanziyue@gmail.com>
This commit is contained in:
Sivabalan Narayanan
2022-02-28 08:13:17 -05:00
committed by GitHub
parent 193215201c
commit 4a59876c8b
11 changed files with 340 additions and 61 deletions

View File

@@ -33,9 +33,14 @@ import java.util.Set;
public class WorkloadProfile implements Serializable {
/**
* Computed workload profile.
* Computed workload stats.
*/
protected final HashMap<String, WorkloadStat> partitionPathStatMap;
protected final HashMap<String, WorkloadStat> inputPartitionPathStatMap;
/**
* Execution/Output workload stats
*/
protected final HashMap<String, WorkloadStat> outputPartitionPathStatMap;
/**
* Global workloadStat.
@@ -47,13 +52,21 @@ public class WorkloadProfile implements Serializable {
*/
private WriteOperationType operationType;
private final boolean hasOutputWorkLoadStats;
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile) {
this.partitionPathStatMap = profile.getLeft();
this.globalStat = profile.getRight();
this(profile, false);
}
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType) {
this(profile);
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, boolean hasOutputWorkLoadStats) {
this.inputPartitionPathStatMap = profile.getLeft();
this.globalStat = profile.getRight();
this.outputPartitionPathStatMap = new HashMap<>();
this.hasOutputWorkLoadStats = hasOutputWorkLoadStats;
}
public WorkloadProfile(Pair<HashMap<String, WorkloadStat>, WorkloadStat> profile, WriteOperationType operationType, boolean hasOutputWorkLoadStats) {
this(profile, hasOutputWorkLoadStats);
this.operationType = operationType;
}
@@ -62,15 +75,37 @@ public class WorkloadProfile implements Serializable {
}
public Set<String> getPartitionPaths() {
return partitionPathStatMap.keySet();
return inputPartitionPathStatMap.keySet();
}
public HashMap<String, WorkloadStat> getPartitionPathStatMap() {
return partitionPathStatMap;
public Set<String> getOutputPartitionPaths() {
return hasOutputWorkLoadStats ? outputPartitionPathStatMap.keySet() : inputPartitionPathStatMap.keySet();
}
public HashMap<String, WorkloadStat> getInputPartitionPathStatMap() {
return inputPartitionPathStatMap;
}
public HashMap<String, WorkloadStat> getOutputPartitionPathStatMap() {
return outputPartitionPathStatMap;
}
public boolean hasOutputWorkLoadStats() {
return hasOutputWorkLoadStats;
}
public void updateOutputPartitionPathStatMap(String partitionPath, WorkloadStat workloadStat) {
if (hasOutputWorkLoadStats) {
outputPartitionPathStatMap.put(partitionPath, workloadStat);
}
}
public WorkloadStat getWorkloadStat(String partitionPath) {
return partitionPathStatMap.get(partitionPath);
return inputPartitionPathStatMap.get(partitionPath);
}
public WorkloadStat getOutputWorkloadStat(String partitionPath) {
return hasOutputWorkLoadStats ? outputPartitionPathStatMap.get(partitionPath) : inputPartitionPathStatMap.get(partitionPath);
}
public WriteOperationType getOperationType() {
@@ -81,7 +116,8 @@ public class WorkloadProfile implements Serializable {
public String toString() {
final StringBuilder sb = new StringBuilder("WorkloadProfile {");
sb.append("globalStat=").append(globalStat).append(", ");
sb.append("partitionStat=").append(partitionPathStatMap).append(", ");
sb.append("InputPartitionStat=").append(inputPartitionPathStatMap).append(", ");
sb.append("OutputPartitionStat=").append(outputPartitionPathStatMap).append(", ");
sb.append("operationType=").append(operationType);
sb.append('}');
return sb.toString();

View File

@@ -33,9 +33,12 @@ public class WorkloadStat implements Serializable {
private long numUpdates = 0L;
private HashMap<String, Pair<String, Long>> insertLocationToCount;
private HashMap<String, Pair<String, Long>> updateLocationToCount;
public WorkloadStat() {
insertLocationToCount = new HashMap<>();
updateLocationToCount = new HashMap<>();
}
@@ -43,6 +46,17 @@ public class WorkloadStat implements Serializable {
return this.numInserts += numInserts;
}
public long addInserts(HoodieRecordLocation location, long numInserts) {
long accNumInserts = 0;
if (insertLocationToCount.containsKey(location.getFileId())) {
accNumInserts = insertLocationToCount.get(location.getFileId()).getRight();
}
insertLocationToCount.put(
location.getFileId(),
Pair.of(location.getInstantTime(), numInserts + accNumInserts));
return this.numInserts += numInserts;
}
public long addUpdates(HoodieRecordLocation location, long numUpdates) {
long accNumUpdates = 0;
if (updateLocationToCount.containsKey(location.getFileId())) {
@@ -66,6 +80,10 @@ public class WorkloadStat implements Serializable {
return updateLocationToCount;
}
public HashMap<String, Pair<String, Long>> getInsertLocationToCount() {
return insertLocationToCount;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("WorkloadStat {");

View File

@@ -68,6 +68,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I, K, O, R>
extends BaseActionExecutor<T, I, K, O, R> {
@@ -108,22 +109,32 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I,
throws HoodieCommitException {
try {
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
profile.getPartitionPaths().forEach(path -> {
WorkloadStat partitionStat = profile.getWorkloadStat(path);
profile.getOutputPartitionPaths().forEach(path -> {
WorkloadStat partitionStat = profile.getOutputWorkloadStat(path);
HoodieWriteStat insertStat = new HoodieWriteStat();
insertStat.setNumInserts(partitionStat.getNumInserts());
insertStat.setFileId("");
insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
metadata.addWriteStat(path, insertStat);
partitionStat.getUpdateLocationToCount().forEach((key, value) -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(key);
// TODO : Write baseCommitTime is possible here ?
writeStat.setPrevCommit(value.getKey());
writeStat.setNumUpdateWrites(value.getValue());
metadata.addWriteStat(path, writeStat);
});
Map<String, Pair<String, Long>> updateLocationMap = partitionStat.getUpdateLocationToCount();
Map<String, Pair<String, Long>> insertLocationMap = partitionStat.getInsertLocationToCount();
Stream.concat(updateLocationMap.keySet().stream(), insertLocationMap.keySet().stream())
.distinct()
.forEach(fileId -> {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(fileId);
Pair<String, Long> updateLocation = updateLocationMap.get(fileId);
Pair<String, Long> insertLocation = insertLocationMap.get(fileId);
// TODO : Write baseCommitTime is possible here ?
writeStat.setPrevCommit(updateLocation != null ? updateLocation.getKey() : insertLocation.getKey());
if (updateLocation != null) {
writeStat.setNumUpdateWrites(updateLocation.getValue());
}
if (insertLocation != null) {
writeStat.setNumInserts(insertLocation.getValue());
}
metadata.addWriteStat(path, writeStat);
});
});
metadata.setOperationType(operationType);

View File

@@ -182,14 +182,30 @@ public abstract class HoodieCompactor<T extends HoodieRecordPayload, I, K, O> im
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
.build();
if (!scanner.iterator().hasNext()) {
scanner.close();
return new ArrayList<>();
}
Option<HoodieBaseFile> oldDataFileOpt =
operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());
// Considering following scenario: if all log blocks in this fileSlice is rollback, it returns an empty scanner.
// But in this case, we need to give it a base file. Otherwise, it will lose base file in following fileSlice.
if (!scanner.iterator().hasNext()) {
if (!oldDataFileOpt.isPresent()) {
scanner.close();
return new ArrayList<>();
} else {
// TODO: we may directly rename original parquet file if there is not evolution/devolution of schema
/*
TaskContextSupplier taskContextSupplier = hoodieCopyOnWriteTable.getTaskContextSupplier();
String newFileName = FSUtils.makeDataFileName(instantTime,
FSUtils.makeWriteToken(taskContextSupplier.getPartitionIdSupplier().get(), taskContextSupplier.getStageIdSupplier().get(), taskContextSupplier.getAttemptIdSupplier().get()),
operation.getFileId(), hoodieCopyOnWriteTable.getBaseFileExtension());
Path oldFilePath = new Path(oldDataFileOpt.get().getPath());
Path newFilePath = new Path(oldFilePath.getParent(), newFileName);
FileUtil.copy(fs,oldFilePath, fs, newFilePath, false, fs.getConf());
*/
}
}
// Compacting is very similar to applying updates to existing file
Iterator<List<WriteStatus>> result;
// If the dataFile is present, perform updates else perform inserts into a new base file.

View File

@@ -199,7 +199,7 @@ public class RollbackUtils {
partitionRollbackRequests.add(
ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath));
// append rollback blocks for updates
// append rollback blocks for updates and inserts as A.2 and B.2
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
partitionRollbackRequests
.addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table));

View File

@@ -91,27 +91,27 @@ public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
WorkloadProfile profile = null;
WorkloadProfile workloadProfile = null;
if (isWorkloadProfileNeeded()) {
profile = new WorkloadProfile(buildProfile(inputRecords));
LOG.info("Workload profile :" + profile);
try {
saveWorkloadProfileMetadataToInflight(profile, instantTime);
} catch (Exception e) {
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
try {
if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
}
} catch (IOException ex) {
LOG.error("Check file exists failed");
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
}
}
workloadProfile = new WorkloadProfile(buildProfile(inputRecords), table.getIndex().canIndexLogFiles());
LOG.info("Input workload profile :" + workloadProfile);
}
final Partitioner partitioner = getPartitioner(profile);
final Partitioner partitioner = getPartitioner(workloadProfile);
try {
saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
} catch (Exception e) {
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
try {
if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
}
} catch (IOException ex) {
LOG.error("Check file exists failed");
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", ex);
}
}
Map<Integer, List<HoodieRecord<T>>> partitionedRecords = partition(inputRecords, partitioner);
List<WriteStatus> writeStatuses = new LinkedList<>();

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.NumericUtils;
@@ -64,9 +65,9 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
*/
private int totalBuckets = 0;
/**
* Stat for the current workload. Helps in determining inserts, upserts etc.
* Stat for the input and output workload. Describe the workload before and after being assigned buckets.
*/
private WorkloadProfile profile;
private WorkloadProfile workloadProfile;
/**
* Helps decide which bucket an incoming update should go to.
*/
@@ -84,16 +85,16 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
protected final HoodieWriteConfig config;
public JavaUpsertPartitioner(WorkloadProfile profile, HoodieEngineContext context, HoodieTable table,
public JavaUpsertPartitioner(WorkloadProfile workloadProfile, HoodieEngineContext context, HoodieTable table,
HoodieWriteConfig config) {
updateLocationToBucket = new HashMap<>();
partitionPathToInsertBucketInfos = new HashMap<>();
bucketInfoMap = new HashMap<>();
this.profile = profile;
this.workloadProfile = workloadProfile;
this.table = table;
this.config = config;
assignUpdates(profile);
assignInserts(profile, context);
assignUpdates(workloadProfile);
assignInserts(workloadProfile, context);
LOG.info("Total Buckets :" + totalBuckets + ", buckets info => " + bucketInfoMap + ", \n"
+ "Partition to insert buckets => " + partitionPathToInsertBucketInfos + ", \n"
@@ -102,11 +103,19 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
private void assignUpdates(WorkloadProfile profile) {
// each update location gets a partition
Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getInputPartitionPathStatMap().entrySet();
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionStat.getKey(), new WorkloadStat());
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
if (profile.hasOutputWorkLoadStats()) {
HoodieRecordLocation hoodieRecordLocation = new HoodieRecordLocation(updateLocEntry.getValue().getKey(), updateLocEntry.getKey());
outputWorkloadStats.addUpdates(hoodieRecordLocation, updateLocEntry.getValue().getValue());
}
}
if (profile.hasOutputWorkLoadStats()) {
profile.updateOutputPartitionPathStatMap(partitionStat.getKey(), outputWorkloadStats);
}
}
}
@@ -133,6 +142,7 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionPath, new WorkloadStat());
if (pStat.getNumInserts() > 0) {
List<SmallFile> smallFiles = partitionSmallFilesMap.getOrDefault(partitionPath, new ArrayList<>());
@@ -158,6 +168,9 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
}
if (profile.hasOutputWorkLoadStats()) {
outputWorkloadStats.addInserts(smallFile.location, recordsToAppend);
}
bucketNumbers.add(bucket);
recordsPerBucket.add(recordsToAppend);
totalUnassignedInserts -= recordsToAppend;
@@ -183,6 +196,9 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
}
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
bucketInfoMap.put(totalBuckets, bucketInfo);
if (profile.hasOutputWorkLoadStats()) {
outputWorkloadStats.addInserts(new HoodieRecordLocation(HoodieWriteStat.NULL_COMMIT, bucketInfo.getFileIdPrefix()), recordsPerBucket.get(recordsPerBucket.size() - 1));
}
totalBuckets++;
}
}
@@ -200,6 +216,9 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
}
if (profile.hasOutputWorkLoadStats()) {
profile.updateOutputPartitionPathStatMap(partitionPath, outputWorkloadStats);
}
}
}
@@ -271,7 +290,7 @@ public class JavaUpsertPartitioner<T extends HoodieRecordPayload<T>> implements
String partitionPath = keyLocation.getLeft().getPartitionPath();
List<InsertBucketCumulativeWeightPair> targetBuckets = partitionPathToInsertBucketInfos.get(partitionPath);
// pick the target bucket to use based on the weights.
final long totalInserts = Math.max(1, profile.getWorkloadStat(partitionPath).getNumInserts());
final long totalInserts = Math.max(1, workloadProfile.getWorkloadStat(partitionPath).getNumInserts());
final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", keyLocation.getLeft().getRecordKey());
final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts;

View File

@@ -156,19 +156,22 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
LOG.info("RDD PreppedRecords was persisted at: " + inputRecordsRDD.getStorageLevel());
}
WorkloadProfile profile = null;
WorkloadProfile workloadProfile = null;
if (isWorkloadProfileNeeded()) {
context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile");
profile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType);
LOG.info("Workload profile :" + profile);
saveWorkloadProfileMetadataToInflight(profile, instantTime);
workloadProfile = new WorkloadProfile(buildProfile(inputRecordsRDD), operationType, table.getIndex().canIndexLogFiles());
LOG.info("Input workload profile :" + workloadProfile);
}
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(workloadProfile);
if (isWorkloadProfileNeeded()) {
saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
}
// handle records update with clustering
JavaRDD<HoodieRecord<T>> inputRecordsRDDWithClusteringUpdate = clusteringHandleUpdate(inputRecordsRDD);
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(profile);
context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and writing data");
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDDWithClusteringUpdate, partitioner);
JavaRDD<WriteStatus> writeStatusRDD = partitionedRecords.mapPartitionsWithIndex((partition, recordItr) -> {

View File

@@ -90,7 +90,7 @@ public class SparkBucketIndexPartitioner<T extends HoodieRecordPayload<T>> exten
private void assignUpdates(WorkloadProfile profile) {
updatePartitionPathFileIds = new HashMap<>();
// each update location gets a partition
Set<Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap()
Set<Entry<String, WorkloadStat>> partitionStatEntries = profile.getInputPartitionPathStatMap()
.entrySet();
for (Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
if (!updatePartitionPathFileIds.containsKey(partitionStat.getKey())) {

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.NumericUtils;
@@ -100,11 +101,19 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
private void assignUpdates(WorkloadProfile profile) {
// each update location gets a partition
Set<Entry<String, WorkloadStat>> partitionStatEntries = profile.getPartitionPathStatMap().entrySet();
Set<Entry<String, WorkloadStat>> partitionStatEntries = profile.getInputPartitionPathStatMap().entrySet();
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionStat.getKey(), new WorkloadStat());
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
addUpdateBucket(partitionStat.getKey(), updateLocEntry.getKey());
if (profile.hasOutputWorkLoadStats()) {
HoodieRecordLocation hoodieRecordLocation = new HoodieRecordLocation(updateLocEntry.getValue().getKey(), updateLocEntry.getKey());
outputWorkloadStats.addUpdates(hoodieRecordLocation, updateLocEntry.getValue().getValue());
}
}
if (profile.hasOutputWorkLoadStats()) {
profile.updateOutputPartitionPathStatMap(partitionStat.getKey(), outputWorkloadStats);
}
}
}
@@ -161,6 +170,7 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
WorkloadStat outputWorkloadStats = profile.getOutputPartitionPathStatMap().getOrDefault(partitionPath, new WorkloadStat());
if (pStat.getNumInserts() > 0) {
List<SmallFile> smallFiles =
@@ -189,6 +199,9 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
}
if (profile.hasOutputWorkLoadStats()) {
outputWorkloadStats.addInserts(smallFile.location, recordsToAppend);
}
bucketNumbers.add(bucket);
recordsPerBucket.add(recordsToAppend);
totalUnassignedInserts -= recordsToAppend;
@@ -218,6 +231,9 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
}
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
bucketInfoMap.put(totalBuckets, bucketInfo);
if (profile.hasOutputWorkLoadStats()) {
outputWorkloadStats.addInserts(new HoodieRecordLocation(HoodieWriteStat.NULL_COMMIT, bucketInfo.getFileIdPrefix()), recordsPerBucket.get(recordsPerBucket.size() - 1));
}
totalBuckets++;
}
}
@@ -235,6 +251,9 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
LOG.info("Total insert buckets for partition path " + partitionPath + " => " + insertBuckets);
partitionPathToInsertBucketInfos.put(partitionPath, insertBuckets);
}
if (profile.hasOutputWorkLoadStats()) {
profile.updateOutputPartitionPathStatMap(partitionPath, outputWorkloadStats);
}
}
}
@@ -302,6 +321,11 @@ public class UpsertPartitioner<T extends HoodieRecordPayload<T>> extends SparkHo
return totalBuckets;
}
@Override
public int getNumPartitions() {
return totalBuckets;
}
@Override
public int getPartition(Object key) {
Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation =

View File

@@ -20,17 +20,33 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -40,9 +56,11 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH;
import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH;
@@ -139,6 +157,131 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
assertFalse(WriteMarkersFactory.get(cfg.getMarkersType(), table, "002").doesMarkerDirExist());
}
@Test
public void testRollbackForCanIndexLogFile() throws IOException {
cleanupResources();
setUpDFS();
//1. prepare data and assert data result
//just generate one partitions
dataGen = new HoodieTestDataGenerator(new String[]{DEFAULT_FIRST_PARTITION_PATH});
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2).withDeleteParallelism(2)
.withTimelineLayoutVersion(TimelineLayoutVersion.CURR_VERSION)
.withWriteStatusClass(MetadataMergeWriteStatus.class)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024).parquetMaxFileSize(1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withEnableBackupForRemoteFileSystemView(false) // Fail test if problem connecting to timeline-server
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build()).withRollbackUsingMarkers(false).withAutoCommit(false).build();
//1. prepare data
HoodieTestDataGenerator.writePartitionMetadata(fs, new String[]{DEFAULT_FIRST_PARTITION_PATH}, basePath);
SparkRDDWriteClient client = getHoodieWriteClient(cfg);
// Write 1 (only inserts)
String newCommitTime = "001";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 2, DEFAULT_FIRST_PARTITION_PATH);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime);
org.apache.hudi.testutils.Assertions.assertNoWriteErrors(statuses.collect());
client.commit(newCommitTime, statuses);
// check fileSlice
HoodieTable table = this.getHoodieTable(metaClient, cfg);
SyncableFileSystemView fsView = getFileSystemViewWithUnCommittedSlices(table.getMetaClient());
List<HoodieFileGroup> firstPartitionCommit2FileGroups = fsView.getAllFileGroups(DEFAULT_FIRST_PARTITION_PATH).collect(Collectors.toList());
assertEquals(1, firstPartitionCommit2FileGroups.size());
assertEquals(1, (int) firstPartitionCommit2FileGroups.get(0).getAllFileSlices().count());
assertFalse(firstPartitionCommit2FileGroups.get(0).getAllFileSlices().findFirst().get().getBaseFile().isPresent());
assertEquals(1, firstPartitionCommit2FileGroups.get(0).getAllFileSlices().findFirst().get().getLogFiles().count());
String generatedFileID = firstPartitionCommit2FileGroups.get(0).getFileGroupId().getFileId();
// check hoodieCommitMeta
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
table.getMetaClient().getCommitTimeline()
.getInstantDetails(new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, "001"))
.get(),
HoodieCommitMetadata.class);
List<HoodieWriteStat> firstPartitionWriteStat = commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH);
assertEquals(2, firstPartitionWriteStat.size());
// we have an empty writeStat for all partition
assert firstPartitionWriteStat.stream().anyMatch(wStat -> StringUtils.isNullOrEmpty(wStat.getFileId()));
// we have one non-empty writeStat which must contains update or insert
assertEquals(1, firstPartitionWriteStat.stream().filter(wStat -> !StringUtils.isNullOrEmpty(wStat.getFileId())).count());
firstPartitionWriteStat.stream().filter(wStat -> !StringUtils.isNullOrEmpty(wStat.getFileId())).forEach(wStat -> {
assert wStat.getNumInserts() > 0;
});
// Write 2 (inserts)
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> updateRecords = Collections.singletonList(dataGen.generateUpdateRecord(records.get(0).getKey(), newCommitTime));
List<HoodieRecord> insertRecordsInSamePartition = dataGen.generateInsertsForPartition(newCommitTime, 2, DEFAULT_FIRST_PARTITION_PATH);
List<HoodieRecord> insertRecordsInOtherPartition = dataGen.generateInsertsForPartition(newCommitTime, 2, DEFAULT_SECOND_PARTITION_PATH);
List<HoodieRecord> recordsToBeWrite = Stream.concat(Stream.concat(updateRecords.stream(), insertRecordsInSamePartition.stream()), insertRecordsInOtherPartition.stream())
.collect(Collectors.toList());
writeRecords = jsc.parallelize(recordsToBeWrite, 1);
statuses = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, statuses);
table = this.getHoodieTable(metaClient, cfg);
commitMetadata = HoodieCommitMetadata.fromBytes(
table.getMetaClient().getCommitTimeline()
.getInstantDetails(new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime))
.get(),
HoodieCommitMetadata.class);
assert commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_FIRST_PARTITION_PATH);
assert commitMetadata.getPartitionToWriteStats().containsKey(DEFAULT_SECOND_PARTITION_PATH);
List<HoodieWriteStat> hoodieWriteStatOptionList = commitMetadata.getPartitionToWriteStats().get(DEFAULT_FIRST_PARTITION_PATH);
// Both update and insert record should enter same existing fileGroup due to small file handling
assertEquals(1, hoodieWriteStatOptionList.size());
assertEquals(generatedFileID, hoodieWriteStatOptionList.get(0).getFileId());
// check insert and update numbers
assertEquals(2, hoodieWriteStatOptionList.get(0).getNumInserts());
assertEquals(1, hoodieWriteStatOptionList.get(0).getNumUpdateWrites());
List<HoodieWriteStat> secondHoodieWriteStatOptionList = commitMetadata.getPartitionToWriteStats().get(DEFAULT_SECOND_PARTITION_PATH);
// All insert should enter one fileGroup
assertEquals(1, secondHoodieWriteStatOptionList.size());
String fileIdInPartitionTwo = secondHoodieWriteStatOptionList.get(0).getFileId();
assertEquals(2, hoodieWriteStatOptionList.get(0).getNumInserts());
// Rollback
HoodieInstant rollBackInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION, "002");
BaseRollbackPlanActionExecutor mergeOnReadRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, cfg, table, "003", rollBackInstant, false,
cfg.shouldRollbackUsingMarkers());
mergeOnReadRollbackPlanActionExecutor.execute().get();
MergeOnReadRollbackActionExecutor mergeOnReadRollbackActionExecutor = new MergeOnReadRollbackActionExecutor(
context,
cfg,
table,
"003",
rollBackInstant,
true,
false);
//3. assert the rollback stat
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
assertEquals(2, rollbackMetadata.size());
//4. assert filegroup after rollback, and compare to the rollbackstat
// assert the first partition data and log file size
HoodieRollbackPartitionMetadata partitionMetadata = rollbackMetadata.get(DEFAULT_FIRST_PARTITION_PATH);
assertTrue(partitionMetadata.getSuccessDeleteFiles().isEmpty());
assertTrue(partitionMetadata.getFailedDeleteFiles().isEmpty());
assertEquals(1, partitionMetadata.getRollbackLogFiles().size());
// assert the second partition data and log file size
partitionMetadata = rollbackMetadata.get(DEFAULT_SECOND_PARTITION_PATH);
assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size());
assertTrue(partitionMetadata.getFailedDeleteFiles().isEmpty());
assertTrue(partitionMetadata.getRollbackLogFiles().isEmpty());
assertEquals(1, partitionMetadata.getSuccessDeleteFiles().size());
}
@Test
public void testFailForCompletedInstants() {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
@@ -169,4 +312,13 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
client.rollback("001");
}
}
private void setUpDFS() throws IOException {
initDFS();
initSparkContexts();
//just generate two partitions
dataGen = new HoodieTestDataGenerator(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH});
initFileSystem();
initDFSMetaClient();
}
}