From 4787076c6d87488c0087ac0cfb7fa86596b1357a Mon Sep 17 00:00:00 2001 From: Balaji Varadarajan Date: Tue, 13 Aug 2019 17:13:30 -0700 Subject: [PATCH] HUDI-204 : Make MOR rollback idempotent and disable using rolling stats for small file selection (#833) --- .../hudi/table/HoodieMergeOnReadTable.java | 92 ++++++------------- .../hudi/table/TestMergeOnReadTable.java | 25 ++++- 2 files changed, 52 insertions(+), 65 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java index ad0424b18..e34a12454 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieMergeOnReadTable.java @@ -44,8 +44,6 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieRollingStat; -import org.apache.hudi.common.model.HoodieRollingStatMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.SyncableFileSystemView; @@ -60,7 +58,6 @@ import org.apache.hudi.common.util.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCompactionException; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieUpsertException; @@ -334,27 +331,6 @@ public class HoodieMergeOnReadTable extends super.finalizeWrite(jsc, instantTs, stats); } - @Override - protected HoodieRollingStatMetadata getRollingStats() { - try { - Option lastInstant = this.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() - .lastInstant(); - if (lastInstant.isPresent()) { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - this.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); - Option lastRollingStat = Option.ofNullable(commitMetadata.getExtraMetadata() - .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY)); - if (lastRollingStat.isPresent()) { - return HoodieCommitMetadata - .fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class); - } - } - return null; - } catch (IOException e) { - throw new HoodieException(); - } - } - /** * UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet * files to larger ones without the need for an index in the logFile. @@ -438,18 +414,6 @@ public class HoodieMergeOnReadTable extends } private long getTotalFileSize(String partitionPath, FileSlice fileSlice) { - if (rollingStatMetadata != null) { - Map partitionRollingStats = - rollingStatMetadata.getPartitionToRollingStats().get(partitionPath); - if (partitionRollingStats != null) { - HoodieRollingStat rollingStatForFile = partitionRollingStats.get(fileSlice.getFileId()); - if (rollingStatForFile != null) { - long inserts = rollingStatForFile.getInserts(); - return averageRecordSize * inserts; - } - } - } - // In case Rolling Stats is not present, fall back to sizing log files based on heuristics if (!fileSlice.getDataFile().isPresent()) { return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList())); } else { @@ -506,35 +470,37 @@ public class HoodieMergeOnReadTable extends }).forEach(wStat -> { Writer writer = null; String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); - boolean success = false; - try { - writer = HoodieLogFormat.newWriterBuilder().onParentPath( - FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), partitionPath)) - .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime) - .withFs(this.metaClient.getFs()) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); - // generate metadata - Map header = generateHeader(commit); - // if update belongs to an existing log file - writer = writer.appendBlock(new HoodieCommandBlock(header)); - success = true; - } catch (IOException | InterruptedException io) { - throw new HoodieRollbackException( - "Failed to rollback for commit " + commit, io); - } finally { + if (null != baseCommitTime) { + boolean success = false; try { - if (writer != null) { - writer.close(); + writer = HoodieLogFormat.newWriterBuilder().onParentPath( + FSUtils.getPartitionPath(this.getMetaClient().getBasePath(), partitionPath)) + .withFileId(wStat.getFileId()).overBaseCommit(baseCommitTime) + .withFs(this.metaClient.getFs()) + .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + // generate metadata + Map header = generateHeader(commit); + // if update belongs to an existing log file + writer = writer.appendBlock(new HoodieCommandBlock(header)); + success = true; + } catch (IOException | InterruptedException io) { + throw new HoodieRollbackException( + "Failed to rollback for commit " + commit, io); + } finally { + try { + if (writer != null) { + writer.close(); + } + if (success) { + // This step is intentionally done after writer is closed. Guarantees that + // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in + // cloud-storage : HUDI-168 + filesToNumBlocksRollback.put(this.getMetaClient().getFs() + .getFileStatus(writer.getLogFile().getPath()), 1L); + } + } catch (IOException io) { + throw new UncheckedIOException(io); } - if (success) { - // This step is intentionally done after writer is closed. Guarantees that - // getFileStatus would reflect correct stats and FileNotFoundException is not thrown in - // cloud-storage : HUDI-168 - filesToNumBlocksRollback.put(this.getMetaClient().getFs() - .getFileStatus(writer.getLogFile().getPath()), 1L); - } - } catch (IOException io) { - throw new UncheckedIOException(io); } } }); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java index 98a5d621a..93c1f720b 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMergeOnReadTable.java @@ -1024,11 +1024,27 @@ public class TestMergeOnReadTable { statuses = writeClient.insert(recordsRDD, newCommitTime); writeClient.commit(newCommitTime, statuses); - // rollback a successful commit // Sleep for small interval (at least 1 second) to force a new rollback start time. Thread.sleep(1000); + + // We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs + // and calling rollback twice + final String lastCommitTime = newCommitTime; + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + HoodieInstant last = + metaClient.getCommitsTimeline().getInstants().filter(instant -> instant.getTimestamp().equals(lastCommitTime)) + .findFirst().get(); + String fileName = last.getFileName(); + // Save the .commit file to local directory. + // Rollback will be called twice to test the case where rollback failed first time and retried. + // We got the "BaseCommitTime cannot be null" exception before the fix + TemporaryFolder folder = new TemporaryFolder(); + folder.create(); + File file = folder.newFile(); + metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), fileName), new Path(file.getAbsolutePath())); writeClient.rollback(newCommitTime); - final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + + metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); RealtimeView tableRTFileSystemView = table.getRTFileSystemView(); @@ -1042,6 +1058,11 @@ public class TestMergeOnReadTable { fileSlice.getLogFiles().count() > 0).count(); } Assert.assertTrue(numLogFiles == 0); + metaClient.getFs().copyFromLocalFile(new Path(file.getAbsolutePath()), + new Path(metaClient.getMetaPath(), fileName)); + Thread.sleep(1000); + // Rollback again to pretend the first rollback failed partially. This should not error our + writeClient.rollback(newCommitTime); } @Test