diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 2992f4abd..61b3efbb7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -117,7 +117,7 @@ public class HoodieTimelineArchiver { if (this.writer == null) { return HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent()) .withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION) - .withFs(metaClient.getFs()).overBaseCommit("").build(); + .withFs(metaClient.getFs()).overBaseCommit("").withUseHSync(config.getUseHSync()).build(); } else { return this.writer; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index a983413f6..27730b925 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -481,6 +481,12 @@ public class HoodieWriteConfig extends HoodieConfig { .sinceVersion("0.11.0") .withDocumentation("Auto adjust lock configurations when metadata table is enabled and for async table services."); + public static final ConfigProperty USE_HSYNC = ConfigProperty + .key("hoodie.write.use.hsync") + .defaultValue(true) + .sinceVersion("0.12.0") + .withDocumentation("Use hsync or not"); + private ConsistencyGuardConfig consistencyGuardConfig; private FileSystemRetryConfig fileSystemRetryConfig; @@ -2063,6 +2069,10 @@ public class HoodieWriteConfig extends HoodieConfig { return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE)); } + public Boolean getUseHSync() { + return getBooleanOrDefault(USE_HSYNC); + } + /** * Are any table services configured to run inline for both scheduling and execution? * @@ -2542,6 +2552,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withUseHSync(boolean useHSync) { + writeConfig.setValue(USE_HSYNC, String.valueOf(useHSync)); + return this; + } + protected void setDefaults() { writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType)); // Check for mandatory properties diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 426e20f83..50a0184a3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -477,7 +477,8 @@ public class HoodieAppendHandle extends .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs) .withRolloverLogWriteToken(writeToken) .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken)) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withUseHSync(config.getUseHSync()).build(); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index c7cc50967..d34d1fcf8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -716,7 +716,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta .withFs(dataMetaClient.getFs()) .withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN) .withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withUseHSync(dataWriteConfig.getUseHSync()).build(); writer.appendBlock(block); writer.close(); } catch (InterruptedException e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java index 8d5e76730..31b1e318a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackHelper.java @@ -129,7 +129,8 @@ public class BaseRollbackHelper implements Serializable { .withFileId(fileId) .overBaseCommit(latestBaseInstant) .withFs(metaClient.getFs()) - .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); + .withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withUseHSync(config.getUseHSync()).build(); // generate metadata if (doDelete) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java index 569b4a23b..5b506b125 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java @@ -144,6 +144,8 @@ public interface HoodieLogFormat { // Rollover Log file write token private String rolloverLogWriteToken; + private Boolean useHSync; + public WriterBuilder withBufferSize(int bufferSize) { this.bufferSize = bufferSize; return this; @@ -204,6 +206,11 @@ public interface HoodieLogFormat { return this; } + public WriterBuilder withUseHSync(boolean useHSync) { + this.useHSync = useHSync; + return this; + } + public Writer build() throws IOException { LOG.info("Building HoodieLogFormat Writer"); if (fs == null) { @@ -264,7 +271,10 @@ public interface HoodieLogFormat { if (sizeThreshold == null) { sizeThreshold = DEFAULT_SIZE_THRESHOLD; } - return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold, rolloverLogWriteToken); + if (useHSync == null) { + useHSync = true; + } + return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold, rolloverLogWriteToken, useHSync); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java index 8dbe85efd..50baab034 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java @@ -58,15 +58,18 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { private boolean closed = false; private transient Thread shutdownThread = null; + private final boolean useHSync; + private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet"; - HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, String rolloverLogWriteToken) { + HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, String rolloverLogWriteToken, Boolean useHSync) { this.fs = fs; this.logFile = logFile; this.sizeThreshold = sizeThreshold; this.bufferSize = bufferSize; this.replication = replication; this.rolloverLogWriteToken = rolloverLogWriteToken; + this.useHSync = useHSync; addShutDownHook(); } @@ -258,7 +261,9 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer { output.flush(); // NOTE : the following API call makes sure that the data is flushed to disk on DataNodes (akin to POSIX fsync()) // See more details here : https://issues.apache.org/jira/browse/HDFS-744 - output.hsync(); + if (useHSync) { + output.hsync(); + } } @Override