1
0

[HUDI-1054] Several performance fixes during finalizing writes (#1768)

Co-authored-by: Udit Mehrotra <uditme@amazon.com>
This commit is contained in:
Udit Mehrotra
2020-07-31 20:10:28 -07:00
committed by GitHub
parent 727f1df62c
commit e79fbc07fe
10 changed files with 130 additions and 53 deletions

View File

@@ -92,7 +92,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest {
// archive // archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
archiveLog.archiveIfRequired(); archiveLog.archiveIfRequired(jsc);
} }
@AfterEach @AfterEach

View File

@@ -176,7 +176,7 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest {
// archive // archive
metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient());
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration()); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration());
archiveLog.archiveIfRequired(); archiveLog.archiveIfRequired(jsc);
CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104")); CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104"));
assertTrue(cr.isSuccess()); assertTrue(cr.isSuccess());

View File

@@ -337,7 +337,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
try { try {
// Delete the marker directory for the instant. // Delete the marker directory for the instant.
new MarkerFiles(table, instantTime).quietDeleteMarkerDir(); new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());
// Do an inline compaction if enabled // Do an inline compaction if enabled
if (config.isInlineCompaction()) { if (config.isInlineCompaction()) {
@@ -349,7 +349,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
} }
// We cannot have unbounded commit files. Archive commits if we have to archive // We cannot have unbounded commit files. Archive commits if we have to archive
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf);
archiveLog.archiveIfRequired(); archiveLog.archiveIfRequired(jsc);
autoCleanOnCommit(instantTime); autoCleanOnCommit(instantTime);
} catch (IOException ioe) { } catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe); throw new HoodieIOException(ioe.getMessage(), ioe);

View File

@@ -88,6 +88,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
public static final String MARKERS_DELETE_PARALLELISM = "hoodie.markers.delete.parallelism";
public static final String DEFAULT_MARKERS_DELETE_PARALLELISM = "100";
public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode";
public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT
.toString(); .toString();
@@ -235,6 +237,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM)); return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
} }
public int getMarkersDeleteParallelism() {
return Integer.parseInt(props.getProperty(MARKERS_DELETE_PARALLELISM));
}
public boolean isEmbeddedTimelineServerEnabled() { public boolean isEmbeddedTimelineServerEnabled() {
return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED)); return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
} }
@@ -830,6 +836,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this; return this;
} }
public Builder withMarkersDeleteParallelism(int parallelism) {
props.setProperty(MARKERS_DELETE_PARALLELISM, String.valueOf(parallelism));
return this;
}
public Builder withEmbeddedTimelineServerEnabled(boolean enabled) { public Builder withEmbeddedTimelineServerEnabled(boolean enabled) {
props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled)); props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled));
return this; return this;
@@ -874,6 +885,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
DEFAULT_HOODIE_WRITE_STATUS_CLASS); DEFAULT_HOODIE_WRITE_STATUS_CLASS);
setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM, setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM,
DEFAULT_FINALIZE_WRITE_PARALLELISM); DEFAULT_FINALIZE_WRITE_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(MARKERS_DELETE_PARALLELISM), MARKERS_DELETE_PARALLELISM,
DEFAULT_MARKERS_DELETE_PARALLELISM);
setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED), setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED); EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP), setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),

View File

@@ -70,6 +70,7 @@ import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@@ -428,21 +429,21 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
} }
// we are not including log appends here, since they are already fail-safe. // we are not including log appends here, since they are already fail-safe.
List<String> invalidDataPaths = markers.createdAndMergedDataPaths(); Set<String> invalidDataPaths = markers.createdAndMergedDataPaths(jsc, config.getFinalizeWriteParallelism());
List<String> validDataPaths = stats.stream() Set<String> validDataPaths = stats.stream()
.map(HoodieWriteStat::getPath) .map(HoodieWriteStat::getPath)
.filter(p -> p.endsWith(this.getBaseFileExtension())) .filter(p -> p.endsWith(this.getBaseFileExtension()))
.collect(Collectors.toList()); .collect(Collectors.toSet());
// Contains list of partially created files. These needs to be cleaned up. // Contains list of partially created files. These needs to be cleaned up.
invalidDataPaths.removeAll(validDataPaths); invalidDataPaths.removeAll(validDataPaths);
if (!invalidDataPaths.isEmpty()) { if (!invalidDataPaths.isEmpty()) {
LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths); LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths);
} Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream() .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString()))
.map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString())) .collect(Collectors.groupingBy(Pair::getKey));
.collect(Collectors.groupingBy(Pair::getKey));
if (!invalidPathsByPartition.isEmpty()) {
// Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS. // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
// Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
if (consistencyCheckEnabled) { if (consistencyCheckEnabled) {

View File

@@ -54,6 +54,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@@ -121,7 +122,7 @@ public class HoodieTimelineArchiveLog {
/** /**
* Check if commits need to be archived. If yes, archive commits. * Check if commits need to be archived. If yes, archive commits.
*/ */
public boolean archiveIfRequired() throws IOException { public boolean archiveIfRequired(JavaSparkContext jsc) throws IOException {
try { try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
@@ -129,7 +130,7 @@ public class HoodieTimelineArchiveLog {
if (!instantsToArchive.isEmpty()) { if (!instantsToArchive.isEmpty()) {
this.writer = openWriter(); this.writer = openWriter();
LOG.info("Archiving instants " + instantsToArchive); LOG.info("Archiving instants " + instantsToArchive);
archive(instantsToArchive); archive(jsc, instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive); LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive); success = deleteArchivedInstants(instantsToArchive);
} else { } else {
@@ -267,7 +268,7 @@ public class HoodieTimelineArchiveLog {
return success; return success;
} }
public void archive(List<HoodieInstant> instants) throws HoodieCommitException { public void archive(JavaSparkContext jsc, List<HoodieInstant> instants) throws HoodieCommitException {
try { try {
HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
@@ -275,7 +276,7 @@ public class HoodieTimelineArchiveLog {
List<IndexedRecord> records = new ArrayList<>(); List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) { for (HoodieInstant hoodieInstant : instants) {
try { try {
deleteAnyLeftOverMarkerFiles(hoodieInstant); deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant);
records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
if (records.size() >= this.config.getCommitArchivalBatchSize()) { if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records); writeToFile(wrapperSchema, records);
@@ -293,9 +294,9 @@ public class HoodieTimelineArchiveLog {
} }
} }
private void deleteAnyLeftOverMarkerFiles(HoodieInstant instant) { private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant instant) {
MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp()); MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp());
if (markerFiles.deleteMarkerDir()) { if (markerFiles.deleteMarkerDir(jsc, config.getMarkersDeleteParallelism())) {
LOG.info("Cleaned up left over marker directory for instant :" + instant); LOG.info("Cleaned up left over marker directory for instant :" + instant);
} }
} }

View File

@@ -18,8 +18,12 @@
package org.apache.hudi.table; package org.apache.hudi.table;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
@@ -28,26 +32,27 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.IOType; import org.apache.hudi.io.IOType;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedList; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/** /**
* Operates on marker files for a given write action (commit, delta commit, compaction). * Operates on marker files for a given write action (commit, delta commit, compaction).
*/ */
public class MarkerFiles { public class MarkerFiles implements Serializable {
private static final Logger LOG = LogManager.getLogger(MarkerFiles.class); private static final Logger LOG = LogManager.getLogger(MarkerFiles.class);
public static String stripMarkerSuffix(String path) {
return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
}
private final String instantTime; private final String instantTime;
private final FileSystem fs; private final transient FileSystem fs;
private final Path markerDirPath; private final transient Path markerDirPath;
private final String basePath; private final String basePath;
public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) { public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) {
@@ -64,9 +69,9 @@ public class MarkerFiles {
instantTime); instantTime);
} }
public void quietDeleteMarkerDir() { public void quietDeleteMarkerDir(JavaSparkContext jsc, int parallelism) {
try { try {
deleteMarkerDir(); deleteMarkerDir(jsc, parallelism);
} catch (HoodieIOException ioe) { } catch (HoodieIOException ioe) {
LOG.warn("Error deleting marker directory for instant " + instantTime, ioe); LOG.warn("Error deleting marker directory for instant " + instantTime, ioe);
} }
@@ -74,34 +79,77 @@ public class MarkerFiles {
/** /**
* Delete Marker directory corresponding to an instant. * Delete Marker directory corresponding to an instant.
*
* @param jsc Java Spark Context.
* @param parallelism Spark parallelism for deletion.
*/ */
public boolean deleteMarkerDir() { public boolean deleteMarkerDir(JavaSparkContext jsc, int parallelism) {
try { try {
boolean result = fs.delete(markerDirPath, true); if (fs.exists(markerDirPath)) {
if (result) { FileStatus[] fileStatuses = fs.listStatus(markerDirPath);
List<String> markerDirSubPaths = Arrays.stream(fileStatuses)
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());
if (markerDirSubPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
parallelism = Math.min(markerDirSubPaths.size(), parallelism);
jsc.parallelize(markerDirSubPaths, parallelism).foreach(subPathStr -> {
Path subPath = new Path(subPathStr);
FileSystem fileSystem = subPath.getFileSystem(conf.get());
fileSystem.delete(subPath, true);
});
}
boolean result = fs.delete(markerDirPath, true);
LOG.info("Removing marker directory at " + markerDirPath); LOG.info("Removing marker directory at " + markerDirPath);
} else { return result;
LOG.info("No marker directory to delete at " + markerDirPath);
} }
return result;
} catch (IOException ioe) { } catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe); throw new HoodieIOException(ioe.getMessage(), ioe);
} }
return false;
} }
public boolean doesMarkerDirExist() throws IOException { public boolean doesMarkerDirExist() throws IOException {
return fs.exists(markerDirPath); return fs.exists(markerDirPath);
} }
public List<String> createdAndMergedDataPaths() throws IOException { public Set<String> createdAndMergedDataPaths(JavaSparkContext jsc, int parallelism) throws IOException {
List<String> dataFiles = new LinkedList<>(); Set<String> dataFiles = new HashSet<>();
FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> {
String pathStr = status.getPath().toString(); FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath);
if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { List<String> subDirectories = new ArrayList<>();
dataFiles.add(translateMarkerToDataPath(pathStr)); for (FileStatus topLevelStatus: topLevelStatuses) {
if (topLevelStatus.isFile()) {
String pathStr = topLevelStatus.getPath().toString();
if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
dataFiles.add(translateMarkerToDataPath(pathStr));
}
} else {
subDirectories.add(topLevelStatus.getPath().toString());
} }
return true; }
}, false);
if (subDirectories.size() > 0) {
parallelism = Math.min(subDirectories.size(), parallelism);
SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf());
dataFiles.addAll(jsc.parallelize(subDirectories, parallelism).flatMap(directory -> {
Path path = new Path(directory);
FileSystem fileSystem = path.getFileSystem(serializedConf.get());
RemoteIterator<LocatedFileStatus> itr = fileSystem.listFiles(path, true);
List<String> result = new ArrayList<>();
while (itr.hasNext()) {
FileStatus status = itr.next();
String pathStr = status.getPath().toString();
if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) {
result.add(translateMarkerToDataPath(pathStr));
}
}
return result.iterator();
}).collect());
}
return dataFiles; return dataFiles;
} }
@@ -110,6 +158,10 @@ public class MarkerFiles {
return MarkerFiles.stripMarkerSuffix(rPath); return MarkerFiles.stripMarkerSuffix(rPath);
} }
public static String stripMarkerSuffix(String path) {
return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN));
}
public List<String> allMarkerFilePaths() throws IOException { public List<String> allMarkerFilePaths() throws IOException {
List<String> markerFiles = new ArrayList<>(); List<String> markerFiles = new ArrayList<>();
FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> { FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> {

View File

@@ -113,7 +113,7 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor<Hood
} }
// Finally, remove the marker files post rollback. // Finally, remove the marker files post rollback.
new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir(); new MarkerFiles(table, instantToRollback.getTimestamp()).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism());
return rollbackMetadata; return rollbackMetadata;
} }

View File

@@ -79,7 +79,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
.withParallelism(2, 2).forTable("test-trip-table").build(); .withParallelism(2, 2).forTable("test-trip-table").build();
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result); assertTrue(result);
} }
@@ -157,7 +157,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
metaClient = HoodieTableMetaClient.reload(metaClient); metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf);
assertTrue(archiveLog.archiveIfRequired()); assertTrue(archiveLog.archiveIfRequired(jsc));
// reload the timeline and remove the remaining commits // reload the timeline and remove the remaining commits
timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants(); timeline = metaClient.getActiveTimeline().reload().getAllCommitsTimeline().filterCompletedInstants();
@@ -246,7 +246,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match"); assertEquals(4, timeline.countInstants(), "Loaded 4 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result); assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5"); assertEquals(4, timeline.countInstants(), "Should not archive commits when maxCommitsToKeep is 5");
@@ -289,7 +289,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result); assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe"); assertTrue(timeline.containsOrBeforeTimelineStarts("100"), "Archived commits should always be safe");
@@ -315,7 +315,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match"); assertEquals(6, timeline.countInstants(), "Loaded 6 commits and the count should match");
assertTrue(archiveLog.archiveIfRequired()); assertTrue(archiveLog.archiveIfRequired(jsc));
timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants(); timeline = metaClient.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
assertEquals(5, timeline.countInstants(), assertEquals(5, timeline.countInstants(),
"Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)"); "Since we have a savepoint at 101, we should never archive any commit after 101 (we only archive 100)");
@@ -349,7 +349,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline(); HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match"); assertEquals(8, timeline.countInstants(), "Loaded 6 commits and the count should match");
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result); assertTrue(result);
timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline(); timeline = metaClient.getActiveTimeline().reload().getCommitsAndCompactionTimeline();
assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")), assertFalse(timeline.containsInstant(new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, "100")),
@@ -397,7 +397,7 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf()); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, dfs.getConf());
boolean result = archiveLog.archiveIfRequired(); boolean result = archiveLog.archiveIfRequired(jsc);
assertTrue(result); assertTrue(result);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(); HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline();
List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3); List<HoodieInstant> archivedInstants = Arrays.asList(instant1, instant2, instant3);

View File

@@ -28,6 +28,9 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.io.IOType; import org.apache.hudi.io.IOType;
import org.apache.hudi.testutils.HoodieClientTestUtils;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -45,16 +48,23 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
private MarkerFiles markerFiles; private MarkerFiles markerFiles;
private FileSystem fs; private FileSystem fs;
private Path markerFolderPath; private Path markerFolderPath;
private JavaSparkContext jsc;
@BeforeEach @BeforeEach
public void setup() throws IOException { public void setup() throws IOException {
initPath(); initPath();
initMetaClient(); initMetaClient();
this.jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(TestMarkerFiles.class.getName()));
this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf()); this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf());
this.markerFolderPath = new Path(metaClient.getMarkerFolderPath("000")); this.markerFolderPath = new Path(metaClient.getMarkerFolderPath("000"));
this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), markerFolderPath.toString(), "000"); this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), markerFolderPath.toString(), "000");
} }
@AfterEach
public void cleanup() {
jsc.stop();
}
private void createSomeMarkerFiles() { private void createSomeMarkerFiles() {
markerFiles.create("2020/06/01", "file1", IOType.MERGE); markerFiles.create("2020/06/01", "file1", IOType.MERGE);
markerFiles.create("2020/06/02", "file2", IOType.APPEND); markerFiles.create("2020/06/02", "file2", IOType.APPEND);
@@ -97,7 +107,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
// then // then
assertTrue(markerFiles.doesMarkerDirExist()); assertTrue(markerFiles.doesMarkerDirExist());
assertTrue(markerFiles.deleteMarkerDir()); assertTrue(markerFiles.deleteMarkerDir(jsc, 2));
assertFalse(markerFiles.doesMarkerDirExist()); assertFalse(markerFiles.doesMarkerDirExist());
} }
@@ -105,7 +115,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
public void testDeletionWhenMarkerDirNotExists() throws IOException { public void testDeletionWhenMarkerDirNotExists() throws IOException {
// then // then
assertFalse(markerFiles.doesMarkerDirExist()); assertFalse(markerFiles.doesMarkerDirExist());
assertFalse(markerFiles.deleteMarkerDir()); assertFalse(markerFiles.deleteMarkerDir(jsc, 2));
} }
@Test @Test
@@ -120,7 +130,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness {
// then // then
assertIterableEquals(CollectionUtils.createImmutableList( assertIterableEquals(CollectionUtils.createImmutableList(
"2020/06/01/file1", "2020/06/03/file3"), "2020/06/01/file1", "2020/06/03/file3"),
markerFiles.createdAndMergedDataPaths().stream().sorted().collect(Collectors.toList()) markerFiles.createdAndMergedDataPaths(jsc, 2).stream().sorted().collect(Collectors.toList())
); );
} }