1
0

HUDI-204 : Make MOR rollback idempotent and disable using rolling stats for small file selection (#833)

This commit is contained in:
Balaji Varadarajan
2019-08-13 17:13:30 -07:00
committed by vinoth chandar
parent 8d37fbf0db
commit 4787076c6d
2 changed files with 52 additions and 65 deletions

View File

@@ -44,8 +44,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieRollingStat;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTimeline; import org.apache.hudi.common.table.HoodieTimeline;
import org.apache.hudi.common.table.SyncableFileSystemView; import org.apache.hudi.common.table.SyncableFileSystemView;
@@ -60,7 +58,6 @@ import org.apache.hudi.common.util.FSUtils;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException; import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.exception.HoodieUpsertException;
@@ -334,27 +331,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
super.finalizeWrite(jsc, instantTs, stats); super.finalizeWrite(jsc, instantTs, stats);
} }
@Override
protected HoodieRollingStatMetadata getRollingStats() {
try {
Option<HoodieInstant> lastInstant = this.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
.lastInstant();
if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
this.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
Option<String> lastRollingStat = Option.ofNullable(commitMetadata.getExtraMetadata()
.get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY));
if (lastRollingStat.isPresent()) {
return HoodieCommitMetadata
.fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class);
}
}
return null;
} catch (IOException e) {
throw new HoodieException();
}
}
/** /**
* UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet * UpsertPartitioner for MergeOnRead table type, this allows auto correction of small parquet
* files to larger ones without the need for an index in the logFile. * files to larger ones without the need for an index in the logFile.
@@ -438,18 +414,6 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
} }
private long getTotalFileSize(String partitionPath, FileSlice fileSlice) { private long getTotalFileSize(String partitionPath, FileSlice fileSlice) {
if (rollingStatMetadata != null) {
Map<String, HoodieRollingStat> partitionRollingStats =
rollingStatMetadata.getPartitionToRollingStats().get(partitionPath);
if (partitionRollingStats != null) {
HoodieRollingStat rollingStatForFile = partitionRollingStats.get(fileSlice.getFileId());
if (rollingStatForFile != null) {
long inserts = rollingStatForFile.getInserts();
return averageRecordSize * inserts;
}
}
}
// In case Rolling Stats is not present, fall back to sizing log files based on heuristics
if (!fileSlice.getDataFile().isPresent()) { if (!fileSlice.getDataFile().isPresent()) {
return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList())); return convertLogFilesSizeToExpectedParquetSize(fileSlice.getLogFiles().collect(Collectors.toList()));
} else { } else {
@@ -506,6 +470,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
}).forEach(wStat -> { }).forEach(wStat -> {
Writer writer = null; Writer writer = null;
String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId()); String baseCommitTime = fileIdToBaseCommitTimeForLogMap.get(wStat.getFileId());
if (null != baseCommitTime) {
boolean success = false; boolean success = false;
try { try {
writer = HoodieLogFormat.newWriterBuilder().onParentPath( writer = HoodieLogFormat.newWriterBuilder().onParentPath(
@@ -537,6 +502,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
throw new UncheckedIOException(io); throw new UncheckedIOException(io);
} }
} }
}
}); });
return HoodieRollbackStat.newBuilder() return HoodieRollbackStat.newBuilder()
.withPartitionPath(partitionPath) .withPartitionPath(partitionPath)

View File

@@ -1024,11 +1024,27 @@ public class TestMergeOnReadTable {
statuses = writeClient.insert(recordsRDD, newCommitTime); statuses = writeClient.insert(recordsRDD, newCommitTime);
writeClient.commit(newCommitTime, statuses); writeClient.commit(newCommitTime, statuses);
// rollback a successful commit
// Sleep for small interval (at least 1 second) to force a new rollback start time. // Sleep for small interval (at least 1 second) to force a new rollback start time.
Thread.sleep(1000); Thread.sleep(1000);
// We will test HUDI-204 here. We will simulate rollback happening twice by copying the commit file to local fs
// and calling rollback twice
final String lastCommitTime = newCommitTime;
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieInstant last =
metaClient.getCommitsTimeline().getInstants().filter(instant -> instant.getTimestamp().equals(lastCommitTime))
.findFirst().get();
String fileName = last.getFileName();
// Save the .commit file to local directory.
// Rollback will be called twice to test the case where rollback failed first time and retried.
// We got the "BaseCommitTime cannot be null" exception before the fix
TemporaryFolder folder = new TemporaryFolder();
folder.create();
File file = folder.newFile();
metaClient.getFs().copyToLocalFile(new Path(metaClient.getMetaPath(), fileName), new Path(file.getAbsolutePath()));
writeClient.rollback(newCommitTime); writeClient.rollback(newCommitTime);
final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
RealtimeView tableRTFileSystemView = table.getRTFileSystemView(); RealtimeView tableRTFileSystemView = table.getRTFileSystemView();
@@ -1042,6 +1058,11 @@ public class TestMergeOnReadTable {
fileSlice.getLogFiles().count() > 0).count(); fileSlice.getLogFiles().count() > 0).count();
} }
Assert.assertTrue(numLogFiles == 0); Assert.assertTrue(numLogFiles == 0);
metaClient.getFs().copyFromLocalFile(new Path(file.getAbsolutePath()),
new Path(metaClient.getMetaPath(), fileName));
Thread.sleep(1000);
// Rollback again to pretend the first rollback failed partially. This should not error our
writeClient.rollback(newCommitTime);
} }
@Test @Test