From 819e8018ff926fea21c95e75ae1b7991afb227c2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 2 Feb 2022 13:10:51 -0800 Subject: [PATCH] [HUDI-3322][HUDI-3343] Fixing Metadata Table Records Duplication Issues (#4716) This change is addressing issues in regards to Metadata Table observing ingesting duplicated records leading to it persisting incorrect file-sizes for the files referred to in those records. There are multiple issues that were leading to that: - [HUDI-3322] Incorrect Rollback Plan generation: Rollback Plan generated for MOR tables was overly expansively listing all log-files with the latest base-instant as the ones that have been affected by the rollback, leading to invalid MT records being ingested referring to those. - [HUDI-3343] Metadata Table including Uncommitted Log Files during Bootstrap: Since MT is bootstrapped at the end of the commit operation execution (after FS activity, but before committing to the timeline), it was actually incorrectly ingesting some files that were part of the intermediate state of the operation being committed. This change will unblock Stack of PRs based off #4556 --- .../hudi/client/BaseHoodieWriteClient.java | 4 +- .../action/rollback/BaseRollbackHelper.java | 46 ++++---- .../rollback/ListingBasedRollbackHelper.java | 32 +++--- .../rollback/ListingBasedRollbackRequest.java | 31 ++++-- .../rollback/MarkerBasedRollbackStrategy.java | 55 +++++----- .../table/action/rollback/RollbackUtils.java | 100 +++++++++--------- .../utils/TestMetadataConversionUtils.java | 1 - .../hudi/client/SparkRDDWriteClient.java | 8 +- ...stHoodieSparkMergeOnReadTableRollback.java | 11 +- .../TestMarkerBasedRollbackStrategy.java | 3 - .../src/main/avro/HoodieRollbackMetadata.avsc | 8 -- .../hudi/common/HoodieRollbackStat.java | 20 +--- .../org/apache/hudi/common/fs/FSUtils.java | 23 ++-- .../common/model/HoodieCommitMetadata.java | 7 +- .../table/timeline/TimelineMetadataUtils.java | 4 +- .../metadata/HoodieTableMetadataUtil.java | 14 +-- .../hudi/common/table/TestTimelineUtils.java | 3 +- .../table/view/TestIncrementalFSViewSync.java | 2 +- .../common/testutils/HoodieTestTable.java | 1 - 19 files changed, 175 insertions(+), 198 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 2e04d01af..867e2f99e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -639,8 +639,8 @@ public abstract class BaseHoodieWriteClient rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())).orElse(table.scheduleRollback(context, rollbackInstantTime, - commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers())); + Option rollbackPlanOption = pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan())) + .orElseGet(() -> table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false, config.shouldRollbackUsingMarkers())); if (rollbackPlanOption.isPresent()) { // execute rollback HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java index 078d9ac27..189de373d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java @@ -18,6 +18,9 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -33,11 +36,6 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -122,19 +120,11 @@ public class BaseRollbackHelper implements Serializable { rollbackStats.forEach(entry -> partitionToRollbackStats.add(Pair.of(entry.getPartitionPath(), entry))); return partitionToRollbackStats.stream(); } else if (!rollbackRequest.getLogBlocksToBeDeleted().isEmpty()) { - Map logFilesToBeDeleted = rollbackRequest.getLogBlocksToBeDeleted(); - String fileId = rollbackRequest.getFileId(); - String latestBaseInstant = rollbackRequest.getLatestBaseInstant(); - FileSystem fs = metaClient.getFs(); - // collect all log files that is supposed to be deleted with this rollback - // what happens if file was deleted when invoking fs.getFileStatus(?) below. - // I understand we don't delete log files. but just curious if we need to handle this case. - Map writtenLogFileSizeMap = new HashMap<>(); - for (Map.Entry entry : logFilesToBeDeleted.entrySet()) { - writtenLogFileSizeMap.put(fs.getFileStatus(new Path(entry.getKey())), entry.getValue()); - } HoodieLogFormat.Writer writer = null; try { + String fileId = rollbackRequest.getFileId(); + String latestBaseInstant = rollbackRequest.getLatestBaseInstant(); + writer = HoodieLogFormat.newWriterBuilder() .onParentPath(FSUtils.getPartitionPath(metaClient.getBasePath(), rollbackRequest.getPartitionPath())) .withFileId(fileId) @@ -156,7 +146,7 @@ public class BaseRollbackHelper implements Serializable { writer.close(); } } catch (IOException io) { - throw new HoodieIOException("Error appending rollback block..", io); + throw new HoodieIOException("Error appending rollback block", io); } } @@ -167,15 +157,21 @@ public class BaseRollbackHelper implements Serializable { metaClient.getFs().getFileStatus(Objects.requireNonNull(writer).getLogFile().getPath()), 1L ); - return Collections.singletonList(Pair.of(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .withRollbackBlockAppendResults(filesToNumBlocksRollback) - .withWrittenLogFileSizeMap(writtenLogFileSizeMap).build())).stream(); + + return Collections.singletonList( + Pair.of(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder() + .withPartitionPath(rollbackRequest.getPartitionPath()) + .withRollbackBlockAppendResults(filesToNumBlocksRollback) + .build())) + .stream(); } else { - return Collections - .singletonList(Pair.of(rollbackRequest.getPartitionPath(), - HoodieRollbackStat.newBuilder().withPartitionPath(rollbackRequest.getPartitionPath()) - .build())).stream(); + return Collections.singletonList( + Pair.of(rollbackRequest.getPartitionPath(), + HoodieRollbackStat.newBuilder() + .withPartitionPath(rollbackRequest.getPartitionPath()) + .build())) + .stream(); } }, numPartitions); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java index b47136fa0..628b2fc37 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java @@ -19,18 +19,17 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieFileFormat; -import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.config.HoodieWriteConfig; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.PathFilter; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -38,7 +37,6 @@ import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -104,22 +102,20 @@ public class ListingBasedRollbackHelper implements Serializable { case APPEND_ROLLBACK_BLOCK: { String fileId = rollbackRequest.getFileId().get(); String latestBaseInstant = rollbackRequest.getLatestBaseInstant().get(); - // collect all log files that is supposed to be deleted with this rollback - Map writtenLogFileSizeMap = FSUtils.getAllLogFiles(metaClient.getFs(), - FSUtils.getPartitionPath(config.getBasePath(), rollbackRequest.getPartitionPath()), - fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), latestBaseInstant) - .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); - Map logFilesToBeDeleted = new HashMap<>(); - for (Map.Entry fileToBeDeleted : writtenLogFileSizeMap.entrySet()) { - logFilesToBeDeleted.put(fileToBeDeleted.getKey().getPath().toString(), fileToBeDeleted.getValue()); - } + HoodieWriteStat writeStat = rollbackRequest.getWriteStat().get(); + + Path fullLogFilePath = FSUtils.getPartitionPath(config.getBasePath(), writeStat.getPath()); + + Map logFilesWithBlocksToRollback = + Collections.singletonMap(fullLogFilePath.toString(), writeStat.getTotalWriteBytes()); + return new HoodieRollbackRequest(rollbackRequest.getPartitionPath(), fileId, latestBaseInstant, - Collections.EMPTY_LIST, logFilesToBeDeleted); + Collections.EMPTY_LIST, logFilesWithBlocksToRollback); } default: throw new IllegalStateException("Unknown Rollback action " + rollbackRequest); } - }, numPartitions).stream().collect(Collectors.toList()); + }, numPartitions); } private FileStatus[] getBaseFilesToBeDeleted(HoodieTableMetaClient metaClient, HoodieWriteConfig config, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java index bc2bbf20a..7411231bb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackRequest.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.util.Option; import java.io.Serializable; @@ -51,32 +52,42 @@ public class ListingBasedRollbackRequest implements Serializable { */ private final Option latestBaseInstant; + /** + * TODO + */ + private final Option writeStat; + private final Type type; + public ListingBasedRollbackRequest(String partitionPath, Type type) { + this(partitionPath, Option.empty(), Option.empty(), Option.empty(), type); + } + public ListingBasedRollbackRequest(String partitionPath, Option fileId, Option latestBaseInstant, + Option writeStat, Type type) { this.partitionPath = partitionPath; this.fileId = fileId; this.latestBaseInstant = latestBaseInstant; + this.writeStat = writeStat; this.type = type; } public static ListingBasedRollbackRequest createRollbackRequestWithDeleteDataFilesOnlyAction(String partitionPath) { - return new ListingBasedRollbackRequest(partitionPath, Option.empty(), Option.empty(), - Type.DELETE_DATA_FILES_ONLY); + return new ListingBasedRollbackRequest(partitionPath, Type.DELETE_DATA_FILES_ONLY); } public static ListingBasedRollbackRequest createRollbackRequestWithDeleteDataAndLogFilesAction(String partitionPath) { - return new ListingBasedRollbackRequest(partitionPath, Option.empty(), Option.empty(), - Type.DELETE_DATA_AND_LOG_FILES); + return new ListingBasedRollbackRequest(partitionPath, Type.DELETE_DATA_AND_LOG_FILES); } - public static ListingBasedRollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, String fileId, - String baseInstant) { - return new ListingBasedRollbackRequest(partitionPath, Option.of(fileId), Option.of(baseInstant), - Type.APPEND_ROLLBACK_BLOCK); + public static ListingBasedRollbackRequest createRollbackRequestWithAppendRollbackBlockAction(String partitionPath, + String fileId, + String baseInstant, + HoodieWriteStat writeStat) { + return new ListingBasedRollbackRequest(partitionPath, Option.of(fileId), Option.of(baseInstant), Option.of(writeStat), Type.APPEND_ROLLBACK_BLOCK); } public String getPartitionPath() { @@ -91,6 +102,10 @@ public class ListingBasedRollbackRequest implements Serializable { return latestBaseInstant; } + public Option getWriteStat() { + return writeStat; + } + public Type getType() { return type; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java index 9d04e3036..e7a4170ec 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/MarkerBasedRollbackStrategy.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.model.HoodieRollbackRequest; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -31,18 +32,13 @@ import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.marker.MarkerBasedRollbackUtils; import org.apache.hudi.table.marker.WriteMarkers; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.hudi.table.action.rollback.BaseRollbackHelper.EMPTY_STRING; @@ -90,42 +86,41 @@ public class MarkerBasedRollbackStrategy Collections.singletonList(fullDeletePath.toString()), Collections.emptyMap()); case APPEND: + // NOTE: This marker file-path does NOT correspond to a log-file, but rather is a phony + // path serving as a "container" for the following components: + // - Base file's file-id + // - Base file's commit instant + // - Partition path return getRollbackRequestForAppend(WriteMarkers.stripMarkerSuffix(markerFilePath)); default: throw new HoodieRollbackException("Unknown marker type, during rollback of " + instantToRollback); } - }, parallelism).stream().collect(Collectors.toList()); + }, parallelism); } catch (Exception e) { throw new HoodieRollbackException("Error rolling back using marker files written for " + instantToRollback, e); } } - protected HoodieRollbackRequest getRollbackRequestForAppend(String appendBaseFilePath) throws IOException { - Path baseFilePathForAppend = new Path(basePath, appendBaseFilePath); + protected HoodieRollbackRequest getRollbackRequestForAppend(String markerFilePath) throws IOException { + Path baseFilePathForAppend = new Path(basePath, markerFilePath); String fileId = FSUtils.getFileIdFromFilePath(baseFilePathForAppend); String baseCommitTime = FSUtils.getCommitTime(baseFilePathForAppend.getName()); - String partitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), new Path(basePath, appendBaseFilePath).getParent()); - Map writtenLogFileSizeMap = getWrittenLogFileSizeMap(partitionPath, baseCommitTime, fileId); - Map writtenLogFileStrSizeMap = new HashMap<>(); - for (Map.Entry entry : writtenLogFileSizeMap.entrySet()) { - writtenLogFileStrSizeMap.put(entry.getKey().getPath().toString(), entry.getValue()); - } - return new HoodieRollbackRequest(partitionPath, fileId, baseCommitTime, Collections.emptyList(), writtenLogFileStrSizeMap); + String relativePartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), baseFilePathForAppend.getParent()); + Path partitionPath = FSUtils.getPartitionPath(config.getBasePath(), relativePartitionPath); + + // NOTE: Since we're rolling back incomplete Delta Commit, it only could have appended its + // block to the latest log-file + // TODO(HUDI-1517) use provided marker-file's path instead + HoodieLogFile latestLogFile = FSUtils.getLatestLogFile(table.getMetaClient().getFs(), partitionPath, fileId, + HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime).get(); + + // NOTE: Marker's don't carry information about the cumulative size of the blocks that have been appended, + // therefore we simply stub this value. + Map logFilesWithBlocsToRollback = + Collections.singletonMap(latestLogFile.getFileStatus().getPath().toString(), -1L); + + return new HoodieRollbackRequest(relativePartitionPath, fileId, baseCommitTime, Collections.emptyList(), + logFilesWithBlocsToRollback); } - /** - * Returns written log file size map for the respective baseCommitTime to assist in metadata table syncing. - * - * @param partitionPathStr partition path of interest - * @param baseCommitTime base commit time of interest - * @param fileId fileId of interest - * @return Map - * @throws IOException - */ - private Map getWrittenLogFileSizeMap(String partitionPathStr, String baseCommitTime, String fileId) throws IOException { - // collect all log files that is supposed to be deleted with this rollback - return FSUtils.getAllLogFiles(table.getMetaClient().getFs(), - FSUtils.getPartitionPath(config.getBasePath(), partitionPathStr), fileId, HoodieFileFormat.HOODIE_LOG.getFileExtension(), baseCommitTime) - .collect(Collectors.toMap(HoodieLogFile::getFileStatus, value -> value.getFileStatus().getLen())); - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java index a4b59a88b..6c17ee236 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java @@ -18,6 +18,7 @@ package org.apache.hudi.table.action.rollback; +import org.apache.hadoop.fs.FileStatus; import org.apache.hudi.avro.model.HoodieRollbackPlan; import org.apache.hudi.common.HoodieRollbackStat; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -33,12 +34,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; - -import org.apache.hadoop.fs.FileStatus; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -48,8 +45,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + public class RollbackUtils { private static final Logger LOG = LogManager.getLogger(RollbackUtils.class); @@ -88,7 +88,7 @@ public class RollbackUtils { * @return Merged HoodieRollbackStat */ static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRollbackStat stat2) { - ValidationUtils.checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath())); + checkArgument(stat1.getPartitionPath().equals(stat2.getPartitionPath())); final List successDeleteFiles = new ArrayList<>(); final List failedDeleteFiles = new ArrayList<>(); final Map commandBlocksCount = new HashMap<>(); @@ -99,9 +99,7 @@ public class RollbackUtils { Option.ofNullable(stat2.getFailedDeleteFiles()).ifPresent(failedDeleteFiles::addAll); Option.ofNullable(stat1.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll); Option.ofNullable(stat2.getCommandBlocksCount()).ifPresent(commandBlocksCount::putAll); - Option.ofNullable(stat1.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll); - Option.ofNullable(stat2.getWrittenLogFileSizeMap()).ifPresent(writtenLogFileSizeMap::putAll); - return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap); + return new HoodieRollbackStat(stat1.getPartitionPath(), successDeleteFiles, failedDeleteFiles, commandBlocksCount); } /** @@ -191,28 +189,22 @@ public class RollbackUtils { // (B.3) Rollback triggered for first commit - Same as (B.1) // (B.4) Rollback triggered for recurring commits - Same as (B.2) plus we need to delete the log files // as well if the base base file gets deleted. - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - table.getMetaClient().getCommitTimeline() - .getInstantDetails(new HoodieInstant(true, instantToRollback.getAction(), instantToRollback.getTimestamp())) - .get(), - HoodieCommitMetadata.class); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + table.getMetaClient().getCommitTimeline().getInstantDetails(instantToRollback).get(), + HoodieCommitMetadata.class); - // In case all data was inserts and the commit failed, delete the file belonging to that commit - // We do not know fileIds for inserts (first inserts are either log files or base files), - // delete all files for the corresponding failed commit, if present (same as COW) - partitionRollbackRequests.add( - ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); + // In case all data was inserts and the commit failed, delete the file belonging to that commit + // We do not know fileIds for inserts (first inserts are either log files or base files), + // delete all files for the corresponding failed commit, if present (same as COW) + partitionRollbackRequests.add( + ListingBasedRollbackRequest.createRollbackRequestWithDeleteDataAndLogFilesAction(partitionPath)); - // append rollback blocks for updates - if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { - partitionRollbackRequests - .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table)); - } - break; - } catch (IOException io) { - throw new HoodieIOException("Failed to collect rollback actions for commit " + commit, io); + // append rollback blocks for updates + if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) { + partitionRollbackRequests + .addAll(generateAppendRollbackBlocksAction(partitionPath, instantToRollback, commitMetadata, table)); } + break; default: break; } @@ -222,7 +214,7 @@ public class RollbackUtils { private static List generateAppendRollbackBlocksAction(String partitionPath, HoodieInstant rollbackInstant, HoodieCommitMetadata commitMetadata, HoodieTable table) { - ValidationUtils.checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); + checkArgument(rollbackInstant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)); // wStat.getPrevCommit() might not give the right commit time in the following // scenario : If a compaction was scheduled, the new commitTime associated with the requested compaction will be @@ -230,30 +222,40 @@ public class RollbackUtils { // But the index (global) might store the baseCommit of the base and not the requested, hence get the // baseCommit always by listing the file slice // With multi writers, rollbacks could be lazy. and so we need to use getLatestFileSlicesBeforeOrOn() instead of getLatestFileSlices() - Map fileIdToBaseCommitTimeForLogMap = table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), - true).collect(Collectors.toMap(FileSlice::getFileId, FileSlice::getBaseInstantTime)); + Map latestFileSlices = table.getSliceView() + .getLatestFileSlicesBeforeOrOn(partitionPath, rollbackInstant.getTimestamp(), true) + .collect(Collectors.toMap(FileSlice::getFileId, Function.identity())); - return commitMetadata.getPartitionToWriteStats().get(partitionPath).stream().filter(wStat -> { + return commitMetadata.getPartitionToWriteStats().get(partitionPath) + .stream() + .filter(writeStat -> { + // Filter out stats without prevCommit since they are all inserts + boolean validForRollback = (writeStat != null) && (!writeStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) + && (writeStat.getPrevCommit() != null) && latestFileSlices.containsKey(writeStat.getFileId()); - // Filter out stats without prevCommit since they are all inserts - boolean validForRollback = (wStat != null) && (!wStat.getPrevCommit().equals(HoodieWriteStat.NULL_COMMIT)) - && (wStat.getPrevCommit() != null) && fileIdToBaseCommitTimeForLogMap.containsKey(wStat.getFileId()); + if (!validForRollback) { + return false; + } - if (validForRollback) { - // For sanity, log instant time can never be less than base-commit on which we are rolling back - ValidationUtils - .checkArgument(HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()), - HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp())); - } + FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); - return validForRollback && HoodieTimeline.compareTimestamps(fileIdToBaseCommitTimeForLogMap.get( - // Base Ts should be strictly less. If equal (for inserts-to-logs), the caller employs another option - // to delete and we should not step on it - wStat.getFileId()), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); - }).map(wStat -> { - String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, wStat.getFileId(), - baseCommitTime); - }).collect(Collectors.toList()); + // For sanity, log-file base-instant time can never be less than base-commit on which we are rolling back + checkArgument( + HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), + HoodieTimeline.LESSER_THAN_OR_EQUALS, rollbackInstant.getTimestamp()), + "Log-file base-instant could not be less than the instant being rolled back"); + + // Command block "rolling back" the preceding block {@link HoodieCommandBlockTypeEnum#ROLLBACK_PREVIOUS_BLOCK} + // w/in the latest file-slice is appended iff base-instant of the log-file is _strictly_ less + // than the instant of the Delta Commit being rolled back. Otherwise, log-file will be cleaned up + // in a different branch of the flow. + return HoodieTimeline.compareTimestamps(latestFileSlice.getBaseInstantTime(), HoodieTimeline.LESSER_THAN, rollbackInstant.getTimestamp()); + }) + .map(writeStat -> { + FileSlice latestFileSlice = latestFileSlices.get(writeStat.getFileId()); + return ListingBasedRollbackRequest.createRollbackRequestWithAppendRollbackBlockAction(partitionPath, + writeStat.getFileId(), latestFileSlice.getBaseInstantTime(), writeStat); + }) + .collect(Collectors.toList()); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java index c0952bc5a..415c12a64 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java @@ -188,7 +188,6 @@ public class TestMetadataConversionUtils extends HoodieCommonTestHarness { rollbackPartitionMetadata.setPartitionPath("p1"); rollbackPartitionMetadata.setSuccessDeleteFiles(Arrays.asList("f1")); rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>()); - rollbackPartitionMetadata.setWrittenLogFiles(new HashMap<>()); rollbackPartitionMetadata.setRollbackLogFiles(new HashMap<>()); Map partitionMetadataMap = new HashMap<>(); partitionMetadataMap.put("p1", rollbackPartitionMetadata); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index f541720fd..6282d7beb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -425,7 +425,8 @@ public class SparkRDDWriteClient extends } @Override - protected HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { + protected HoodieTable>, JavaRDD, JavaRDD> getTableAndInitCtx(WriteOperationType operationType, + String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade( metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()); @@ -442,8 +443,11 @@ public class SparkRDDWriteClient extends metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance()) .run(HoodieTableVersion.current(), instantTime); metaClient.reloadActiveTimeline(); - initializeMetadataTable(Option.of(instantTime)); } + // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation, + // if it didn't exist before + // See https://issues.apache.org/jira/browse/HUDI-3343 for more details + initializeMetadataTable(Option.of(instantTime)); } finally { this.txnManager.endTransaction(); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index 38becc92c..3b5ce0fa1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -142,8 +142,14 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction @ParameterizedTest @ValueSource(booleans = {true, false}) void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception { - HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()); + // NOTE: First writer will have Metadata table DISABLED + HoodieWriteConfig.Builder cfgBuilder = + getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE) + .withMetadataConfig( + HoodieMetadataConfig.newBuilder() + .enable(false) + .build()); + addConfigsForPopulateMetaFields(cfgBuilder, true); HoodieWriteConfig cfg = cfgBuilder.build(); @@ -194,6 +200,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction */ final String commitTime1 = "002"; // WriteClient with custom config (disable small file handling) + // NOTE: Second writer will have Metadata table ENABLED try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) { secondClient.startCommitWithTime(commitTime1); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java index 8b23cf257..fd2af1cdc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestMarkerBasedRollbackStrategy.java @@ -133,7 +133,6 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { assertEquals(1, stat.getSuccessDeleteFiles().size()); assertEquals(0, stat.getFailedDeleteFiles().size()); assertEquals(0, stat.getCommandBlocksCount().size()); - assertEquals(0, stat.getWrittenLogFileSizeMap().size()); } } } @@ -162,8 +161,6 @@ public class TestMarkerBasedRollbackStrategy extends HoodieClientTestBase { assertEquals(0, stat.getFailedDeleteFiles().size()); assertEquals(1, stat.getCommandBlocksCount().size()); stat.getCommandBlocksCount().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()))); - assertEquals(1, stat.getWrittenLogFileSizeMap().size()); - stat.getWrittenLogFileSizeMap().forEach((fileStatus, len) -> assertTrue(fileStatus.getPath().getName().contains(HoodieFileFormat.HOODIE_LOG.getFileExtension()))); } } } diff --git a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc index f342db873..5a300cda9 100644 --- a/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieRollbackMetadata.avsc @@ -38,14 +38,6 @@ "type": "long", "doc": "Size of this file in bytes" } - }], "default":null }, - {"name": "writtenLogFiles", "type": ["null", { - "type": "map", - "doc": "Log files written that were expected to be rolledback", - "values": { - "type": "long", - "doc": "Size of this file in bytes" - } }], "default":null } ] }}}, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java index 3e4ee3431..a3191fa02 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieRollbackStat.java @@ -38,16 +38,13 @@ public class HoodieRollbackStat implements Serializable { private final List failedDeleteFiles; // Count of HoodieLogFile to commandBlocks written for a particular rollback private final Map commandBlocksCount; - // all log files with same base instant as instant to be rolledback - private final Map writtenLogFileSizeMap; public HoodieRollbackStat(String partitionPath, List successDeleteFiles, List failedDeleteFiles, - Map commandBlocksCount, Map writtenLogFileSizeMap) { + Map commandBlocksCount) { this.partitionPath = partitionPath; this.successDeleteFiles = successDeleteFiles; this.failedDeleteFiles = failedDeleteFiles; this.commandBlocksCount = commandBlocksCount; - this.writtenLogFileSizeMap = writtenLogFileSizeMap; } public Map getCommandBlocksCount() { @@ -66,10 +63,6 @@ public class HoodieRollbackStat implements Serializable { return failedDeleteFiles; } - public Map getWrittenLogFileSizeMap() { - return writtenLogFileSizeMap; - } - public static HoodieRollbackStat.Builder newBuilder() { return new Builder(); } @@ -82,7 +75,6 @@ public class HoodieRollbackStat implements Serializable { private List successDeleteFiles; private List failedDeleteFiles; private Map commandBlocksCount; - private Map writtenLogFileSizeMap; private String partitionPath; public Builder withDeletedFileResults(Map deletedFiles) { @@ -108,11 +100,6 @@ public class HoodieRollbackStat implements Serializable { return this; } - public Builder withWrittenLogFileSizeMap(Map writtenLogFileSizeMap) { - this.writtenLogFileSizeMap = writtenLogFileSizeMap; - return this; - } - public Builder withPartitionPath(String partitionPath) { this.partitionPath = partitionPath; return this; @@ -128,10 +115,7 @@ public class HoodieRollbackStat implements Serializable { if (commandBlocksCount == null) { commandBlocksCount = Collections.EMPTY_MAP; } - if (writtenLogFileSizeMap == null) { - writtenLogFileSizeMap = Collections.EMPTY_MAP; - } - return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount, writtenLogFileSizeMap); + return new HoodieRollbackStat(partitionPath, successDeleteFiles, failedDeleteFiles, commandBlocksCount); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index c7086c1e0..7c9b7cc80 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -495,24 +495,25 @@ public class FSUtils { } /** - * Get the latest log file written from the list of log files passed in. + * Get the latest log file for the passed in file-id in the partition path */ - public static Option getLatestLogFile(Stream logFiles) { - return Option.fromJavaOptional(logFiles.min(HoodieLogFile.getReverseLogFileComparator())); + public static Option getLatestLogFile(FileSystem fs, Path partitionPath, String fileId, + String logFileExtension, String baseCommitTime) throws IOException { + return getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime)); } /** - * Get all the log files for the passed in FileId in the partition path. + * Get all the log files for the passed in file-id in the partition path. */ public static Stream getAllLogFiles(FileSystem fs, Path partitionPath, final String fileId, final String logFileExtension, final String baseCommitTime) throws IOException { try { - return Arrays - .stream(fs.listStatus(partitionPath, - path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension))) - .map(HoodieLogFile::new).filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); + PathFilter pathFilter = path -> path.getName().startsWith("." + fileId) && path.getName().contains(logFileExtension); + return Arrays.stream(fs.listStatus(partitionPath, pathFilter)) + .map(HoodieLogFile::new) + .filter(s -> s.getBaseCommitTime().equals(baseCommitTime)); } catch (FileNotFoundException e) { - return Stream.builder().build(); + return Stream.of(); } } @@ -787,4 +788,8 @@ public class FSUtils { public interface SerializableFunction extends Function, Serializable { } + + private static Option getLatestLogFile(Stream logFiles) { + return Option.fromJavaOptional(logFiles.min(HoodieLogFile.getReverseLogFileComparator())); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index c1e8cbf08..d693d91f6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -18,10 +18,6 @@ package org.apache.hudi.common.model; -import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; - import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.PropertyAccessor; @@ -29,6 +25,9 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java index 32e42ee58..723c594ff 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java @@ -77,10 +77,8 @@ public class TimelineMetadataUtils { for (HoodieRollbackStat stat : rollbackStats) { Map rollbackLogFiles = stat.getCommandBlocksCount().keySet().stream() .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen)); - Map probableLogFiles = stat.getWrittenLogFileSizeMap().keySet().stream() - .collect(Collectors.toMap(f -> f.getPath().toString(), FileStatus::getLen)); HoodieRollbackPartitionMetadata metadata = new HoodieRollbackPartitionMetadata(stat.getPartitionPath(), - stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), rollbackLogFiles, probableLogFiles); + stat.getSuccessDeleteFiles(), stat.getFailedDeleteFiles(), rollbackLogFiles); partitionMetadataBuilder.put(stat.getPartitionPath(), metadata); totalDeleted += stat.getSuccessDeleteFiles().size(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 58d63a194..bb29e4236 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -194,7 +194,8 @@ public class HoodieTableMetadataUtil { * @param partitionToDeletedFiles The {@code Map} to fill with files deleted per partition. * @param partitionToAppendedFiles The {@code Map} to fill with files appended per partition and their sizes. */ - private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTimeline, HoodieRollbackMetadata rollbackMetadata, + private static void processRollbackMetadata(HoodieActiveTimeline metadataTableTimeline, + HoodieRollbackMetadata rollbackMetadata, Map> partitionToDeletedFiles, Map> partitionToAppendedFiles, Option lastSyncTs) { @@ -264,17 +265,6 @@ public class HoodieTableMetadataUtil { partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn); }); } - - if (pm.getWrittenLogFiles() != null && !pm.getWrittenLogFiles().isEmpty()) { - if (!partitionToAppendedFiles.containsKey(partition)) { - partitionToAppendedFiles.put(partition, new HashMap<>()); - } - - // Extract appended file name from the absolute paths saved in getWrittenLogFiles() - pm.getWrittenLogFiles().forEach((path, size) -> { - partitionToAppendedFiles.get(partition).merge(new Path(path).getName(), size, fileMergeFn); - }); - } }); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 939729501..22ceb5bfe 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -270,8 +270,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { List rollbacks = new ArrayList<>(); rollbacks.add(new HoodieInstant(false, actionType, commitTs)); - HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap(), - Collections.EMPTY_MAP); + HoodieRollbackStat rollbackStat = new HoodieRollbackStat(partition, deletedFiles, Collections.emptyList(), Collections.emptyMap()); List rollbackStats = new ArrayList<>(); rollbackStats.add(rollbackStat); return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index 0bcebaf71..a9c9db303 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -556,7 +556,7 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness { boolean isRestore) throws IOException { Map> partititonToFiles = deleteFiles(files); List rollbackStats = partititonToFiles.entrySet().stream().map(e -> - new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>(), new HashMap<>()) + new HoodieRollbackStat(e.getKey(), e.getValue(), new ArrayList<>(), new HashMap<>()) ).collect(Collectors.toList()); List rollbacks = new ArrayList<>(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 7b8148a61..c55a389e2 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -354,7 +354,6 @@ public class HoodieTestTable { rollbackPartitionMetadata.setPartitionPath(entry.getKey()); rollbackPartitionMetadata.setSuccessDeleteFiles(entry.getValue()); rollbackPartitionMetadata.setFailedDeleteFiles(new ArrayList<>()); - rollbackPartitionMetadata.setWrittenLogFiles(getWrittenLogFiles(instantTimeToDelete, entry)); long rollbackLogFileSize = 50 + RANDOM.nextInt(500); String fileId = UUID.randomUUID().toString(); String logFileName = logFileName(instantTimeToDelete, fileId, 0);