From e79fbc07fe803bc51cdf4f11948b133bbaa70595 Mon Sep 17 00:00:00 2001 From: Udit Mehrotra Date: Fri, 31 Jul 2020 20:10:28 -0700 Subject: [PATCH] [HUDI-1054] Several performance fixes during finalizing writes (#1768) Co-authored-by: Udit Mehrotra --- .../commands/TestArchivedCommitsCommand.java | 2 +- .../hudi/cli/commands/TestCommitsCommand.java | 2 +- .../apache/hudi/client/HoodieWriteClient.java | 4 +- .../apache/hudi/config/HoodieWriteConfig.java | 13 +++ .../org/apache/hudi/table/HoodieTable.java | 17 +-- .../hudi/table/HoodieTimelineArchiveLog.java | 13 +-- .../org/apache/hudi/table/MarkerFiles.java | 100 +++++++++++++----- .../rollback/BaseRollbackActionExecutor.java | 2 +- .../hudi/io/TestHoodieTimelineArchiveLog.java | 14 +-- .../apache/hudi/table/TestMarkerFiles.java | 16 ++- 10 files changed, 130 insertions(+), 53 deletions(-) diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java index 313c1bcd3..4c7ce8819 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestArchivedCommitsCommand.java @@ -92,7 +92,7 @@ public class TestArchivedCommitsCommand extends AbstractShellIntegrationTest { // archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, hadoopConf); - archiveLog.archiveIfRequired(); + archiveLog.archiveIfRequired(jsc); } @AfterEach diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java index 45c340df2..44e2b8097 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCommitsCommand.java @@ -176,7 +176,7 @@ public class TestCommitsCommand extends AbstractShellIntegrationTest { // archive metaClient = HoodieTableMetaClient.reload(HoodieCLI.getTableMetaClient()); HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(cfg, jsc.hadoopConfiguration()); - archiveLog.archiveIfRequired(); + archiveLog.archiveIfRequired(jsc); CommandResult cr = getShell().executeCommand(String.format("commits showarchived --startTs %s --endTs %s", "100", "104")); assertTrue(cr.isSuccess()); diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index b2ad315b5..9782b46b6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -337,7 +337,7 @@ public class HoodieWriteClient extends AbstractHo try { // Delete the marker directory for the instant. - new MarkerFiles(table, instantTime).quietDeleteMarkerDir(); + new MarkerFiles(table, instantTime).quietDeleteMarkerDir(jsc, config.getMarkersDeleteParallelism()); // Do an inline compaction if enabled if (config.isInlineCompaction()) { @@ -349,7 +349,7 @@ public class HoodieWriteClient extends AbstractHo } // We cannot have unbounded commit files. Archive commits if we have to archive HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(config, hadoopConf); - archiveLog.archiveIfRequired(); + archiveLog.archiveIfRequired(jsc); autoCleanOnCommit(instantTime); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 9aecdf707..69758f251 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -88,6 +88,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName(); public static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism"; public static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM; + public static final String MARKERS_DELETE_PARALLELISM = "hoodie.markers.delete.parallelism"; + public static final String DEFAULT_MARKERS_DELETE_PARALLELISM = "100"; public static final String BULKINSERT_SORT_MODE = "hoodie.bulkinsert.sort.mode"; public static final String DEFAULT_BULKINSERT_SORT_MODE = BulkInsertSortMode.GLOBAL_SORT .toString(); @@ -235,6 +237,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM)); } + public int getMarkersDeleteParallelism() { + return Integer.parseInt(props.getProperty(MARKERS_DELETE_PARALLELISM)); + } + public boolean isEmbeddedTimelineServerEnabled() { return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED)); } @@ -830,6 +836,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withMarkersDeleteParallelism(int parallelism) { + props.setProperty(MARKERS_DELETE_PARALLELISM, String.valueOf(parallelism)); + return this; + } + public Builder withEmbeddedTimelineServerEnabled(boolean enabled) { props.setProperty(EMBEDDED_TIMELINE_SERVER_ENABLED, String.valueOf(enabled)); return this; @@ -874,6 +885,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { DEFAULT_HOODIE_WRITE_STATUS_CLASS); setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM), FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM); + setDefaultOnCondition(props, !props.containsKey(MARKERS_DELETE_PARALLELISM), MARKERS_DELETE_PARALLELISM, + DEFAULT_MARKERS_DELETE_PARALLELISM); setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED), EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED); setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP), diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 748091e1d..d8b0c6e90 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -70,6 +70,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -428,21 +429,21 @@ public abstract class HoodieTable implements Seri } // we are not including log appends here, since they are already fail-safe. - List invalidDataPaths = markers.createdAndMergedDataPaths(); - List validDataPaths = stats.stream() + Set invalidDataPaths = markers.createdAndMergedDataPaths(jsc, config.getFinalizeWriteParallelism()); + Set validDataPaths = stats.stream() .map(HoodieWriteStat::getPath) .filter(p -> p.endsWith(this.getBaseFileExtension())) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); + // Contains list of partially created files. These needs to be cleaned up. invalidDataPaths.removeAll(validDataPaths); + if (!invalidDataPaths.isEmpty()) { LOG.info("Removing duplicate data files created due to spark retries before committing. Paths=" + invalidDataPaths); - } - Map>> invalidPathsByPartition = invalidDataPaths.stream() - .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString())) - .collect(Collectors.groupingBy(Pair::getKey)); + Map>> invalidPathsByPartition = invalidDataPaths.stream() + .map(dp -> Pair.of(new Path(dp).getParent().toString(), new Path(basePath, dp).toString())) + .collect(Collectors.groupingBy(Pair::getKey)); - if (!invalidPathsByPartition.isEmpty()) { // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS. // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit if (consistencyCheckEnabled) { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index 98d3e05fc..4be00a3a5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -54,6 +54,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; import java.io.FileNotFoundException; import java.io.IOException; @@ -121,7 +122,7 @@ public class HoodieTimelineArchiveLog { /** * Check if commits need to be archived. If yes, archive commits. */ - public boolean archiveIfRequired() throws IOException { + public boolean archiveIfRequired(JavaSparkContext jsc) throws IOException { try { List instantsToArchive = getInstantsToArchive().collect(Collectors.toList()); @@ -129,7 +130,7 @@ public class HoodieTimelineArchiveLog { if (!instantsToArchive.isEmpty()) { this.writer = openWriter(); LOG.info("Archiving instants " + instantsToArchive); - archive(instantsToArchive); + archive(jsc, instantsToArchive); LOG.info("Deleting archived instants " + instantsToArchive); success = deleteArchivedInstants(instantsToArchive); } else { @@ -267,7 +268,7 @@ public class HoodieTimelineArchiveLog { return success; } - public void archive(List instants) throws HoodieCommitException { + public void archive(JavaSparkContext jsc, List instants) throws HoodieCommitException { try { HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants(); Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema(); @@ -275,7 +276,7 @@ public class HoodieTimelineArchiveLog { List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { try { - deleteAnyLeftOverMarkerFiles(hoodieInstant); + deleteAnyLeftOverMarkerFiles(jsc, hoodieInstant); records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); if (records.size() >= this.config.getCommitArchivalBatchSize()) { writeToFile(wrapperSchema, records); @@ -293,9 +294,9 @@ public class HoodieTimelineArchiveLog { } } - private void deleteAnyLeftOverMarkerFiles(HoodieInstant instant) { + private void deleteAnyLeftOverMarkerFiles(JavaSparkContext jsc, HoodieInstant instant) { MarkerFiles markerFiles = new MarkerFiles(table, instant.getTimestamp()); - if (markerFiles.deleteMarkerDir()) { + if (markerFiles.deleteMarkerDir(jsc, config.getMarkersDeleteParallelism())) { LOG.info("Cleaned up left over marker directory for instant :" + instant); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java index 00eb7df7a..8a310fd30 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/MarkerFiles.java @@ -18,8 +18,12 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.ValidationUtils; @@ -28,26 +32,27 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.io.IOType; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; +import java.io.Serializable; +import java.util.Arrays; import java.util.ArrayList; -import java.util.LinkedList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * Operates on marker files for a given write action (commit, delta commit, compaction). */ -public class MarkerFiles { +public class MarkerFiles implements Serializable { private static final Logger LOG = LogManager.getLogger(MarkerFiles.class); - public static String stripMarkerSuffix(String path) { - return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN)); - } - private final String instantTime; - private final FileSystem fs; - private final Path markerDirPath; + private final transient FileSystem fs; + private final transient Path markerDirPath; private final String basePath; public MarkerFiles(FileSystem fs, String basePath, String markerFolderPath, String instantTime) { @@ -64,9 +69,9 @@ public class MarkerFiles { instantTime); } - public void quietDeleteMarkerDir() { + public void quietDeleteMarkerDir(JavaSparkContext jsc, int parallelism) { try { - deleteMarkerDir(); + deleteMarkerDir(jsc, parallelism); } catch (HoodieIOException ioe) { LOG.warn("Error deleting marker directory for instant " + instantTime, ioe); } @@ -74,34 +79,77 @@ public class MarkerFiles { /** * Delete Marker directory corresponding to an instant. + * + * @param jsc Java Spark Context. + * @param parallelism Spark parallelism for deletion. */ - public boolean deleteMarkerDir() { + public boolean deleteMarkerDir(JavaSparkContext jsc, int parallelism) { try { - boolean result = fs.delete(markerDirPath, true); - if (result) { + if (fs.exists(markerDirPath)) { + FileStatus[] fileStatuses = fs.listStatus(markerDirPath); + List markerDirSubPaths = Arrays.stream(fileStatuses) + .map(fileStatus -> fileStatus.getPath().toString()) + .collect(Collectors.toList()); + + if (markerDirSubPaths.size() > 0) { + SerializableConfiguration conf = new SerializableConfiguration(fs.getConf()); + parallelism = Math.min(markerDirSubPaths.size(), parallelism); + jsc.parallelize(markerDirSubPaths, parallelism).foreach(subPathStr -> { + Path subPath = new Path(subPathStr); + FileSystem fileSystem = subPath.getFileSystem(conf.get()); + fileSystem.delete(subPath, true); + }); + } + + boolean result = fs.delete(markerDirPath, true); LOG.info("Removing marker directory at " + markerDirPath); - } else { - LOG.info("No marker directory to delete at " + markerDirPath); + return result; } - return result; } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } + return false; } public boolean doesMarkerDirExist() throws IOException { return fs.exists(markerDirPath); } - public List createdAndMergedDataPaths() throws IOException { - List dataFiles = new LinkedList<>(); - FSUtils.processFiles(fs, markerDirPath.toString(), (status) -> { - String pathStr = status.getPath().toString(); - if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { - dataFiles.add(translateMarkerToDataPath(pathStr)); + public Set createdAndMergedDataPaths(JavaSparkContext jsc, int parallelism) throws IOException { + Set dataFiles = new HashSet<>(); + + FileStatus[] topLevelStatuses = fs.listStatus(markerDirPath); + List subDirectories = new ArrayList<>(); + for (FileStatus topLevelStatus: topLevelStatuses) { + if (topLevelStatus.isFile()) { + String pathStr = topLevelStatus.getPath().toString(); + if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { + dataFiles.add(translateMarkerToDataPath(pathStr)); + } + } else { + subDirectories.add(topLevelStatus.getPath().toString()); } - return true; - }, false); + } + + if (subDirectories.size() > 0) { + parallelism = Math.min(subDirectories.size(), parallelism); + SerializableConfiguration serializedConf = new SerializableConfiguration(fs.getConf()); + dataFiles.addAll(jsc.parallelize(subDirectories, parallelism).flatMap(directory -> { + Path path = new Path(directory); + FileSystem fileSystem = path.getFileSystem(serializedConf.get()); + RemoteIterator itr = fileSystem.listFiles(path, true); + List result = new ArrayList<>(); + while (itr.hasNext()) { + FileStatus status = itr.next(); + String pathStr = status.getPath().toString(); + if (pathStr.contains(HoodieTableMetaClient.MARKER_EXTN) && !pathStr.endsWith(IOType.APPEND.name())) { + result.add(translateMarkerToDataPath(pathStr)); + } + } + return result.iterator(); + }).collect()); + } + return dataFiles; } @@ -110,6 +158,10 @@ public class MarkerFiles { return MarkerFiles.stripMarkerSuffix(rPath); } + public static String stripMarkerSuffix(String path) { + return path.substring(0, path.indexOf(HoodieTableMetaClient.MARKER_EXTN)); + } + public List allMarkerFilePaths() throws IOException { List markerFiles = new ArrayList<>(); FSUtils.processFiles(fs, markerDirPath.toString(), fileStatus -> { diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java index 846e8a853..90b9bb387 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java @@ -113,7 +113,7 @@ public abstract class BaseRollbackActionExecutor extends BaseActionExecutor archivedInstants = Arrays.asList(instant1, instant2, instant3); diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java index 723d9e17a..af679cee8 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestMarkerFiles.java @@ -28,6 +28,9 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.IOType; +import org.apache.hudi.testutils.HoodieClientTestUtils; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -45,16 +48,23 @@ public class TestMarkerFiles extends HoodieCommonTestHarness { private MarkerFiles markerFiles; private FileSystem fs; private Path markerFolderPath; + private JavaSparkContext jsc; @BeforeEach public void setup() throws IOException { initPath(); initMetaClient(); + this.jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest(TestMarkerFiles.class.getName())); this.fs = FSUtils.getFs(metaClient.getBasePath(), metaClient.getHadoopConf()); this.markerFolderPath = new Path(metaClient.getMarkerFolderPath("000")); this.markerFiles = new MarkerFiles(fs, metaClient.getBasePath(), markerFolderPath.toString(), "000"); } + @AfterEach + public void cleanup() { + jsc.stop(); + } + private void createSomeMarkerFiles() { markerFiles.create("2020/06/01", "file1", IOType.MERGE); markerFiles.create("2020/06/02", "file2", IOType.APPEND); @@ -97,7 +107,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness { // then assertTrue(markerFiles.doesMarkerDirExist()); - assertTrue(markerFiles.deleteMarkerDir()); + assertTrue(markerFiles.deleteMarkerDir(jsc, 2)); assertFalse(markerFiles.doesMarkerDirExist()); } @@ -105,7 +115,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness { public void testDeletionWhenMarkerDirNotExists() throws IOException { // then assertFalse(markerFiles.doesMarkerDirExist()); - assertFalse(markerFiles.deleteMarkerDir()); + assertFalse(markerFiles.deleteMarkerDir(jsc, 2)); } @Test @@ -120,7 +130,7 @@ public class TestMarkerFiles extends HoodieCommonTestHarness { // then assertIterableEquals(CollectionUtils.createImmutableList( "2020/06/01/file1", "2020/06/03/file3"), - markerFiles.createdAndMergedDataPaths().stream().sorted().collect(Collectors.toList()) + markerFiles.createdAndMergedDataPaths(jsc, 2).stream().sorted().collect(Collectors.toList()) ); }