Add option to control use hsync or not
This commit is contained in:
@@ -117,7 +117,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> {
|
|||||||
if (this.writer == null) {
|
if (this.writer == null) {
|
||||||
return HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent())
|
return HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent())
|
||||||
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
|
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
|
||||||
.withFs(metaClient.getFs()).overBaseCommit("").build();
|
.withFs(metaClient.getFs()).overBaseCommit("").withUseHSync(config.getUseHSync()).build();
|
||||||
} else {
|
} else {
|
||||||
return this.writer;
|
return this.writer;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -481,6 +481,12 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
.sinceVersion("0.11.0")
|
.sinceVersion("0.11.0")
|
||||||
.withDocumentation("Auto adjust lock configurations when metadata table is enabled and for async table services.");
|
.withDocumentation("Auto adjust lock configurations when metadata table is enabled and for async table services.");
|
||||||
|
|
||||||
|
public static final ConfigProperty<Boolean> USE_HSYNC = ConfigProperty
|
||||||
|
.key("hoodie.write.use.hsync")
|
||||||
|
.defaultValue(true)
|
||||||
|
.sinceVersion("0.12.0")
|
||||||
|
.withDocumentation("Use hsync or not");
|
||||||
|
|
||||||
private ConsistencyGuardConfig consistencyGuardConfig;
|
private ConsistencyGuardConfig consistencyGuardConfig;
|
||||||
private FileSystemRetryConfig fileSystemRetryConfig;
|
private FileSystemRetryConfig fileSystemRetryConfig;
|
||||||
|
|
||||||
@@ -2063,6 +2069,10 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
|
return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Boolean getUseHSync() {
|
||||||
|
return getBooleanOrDefault(USE_HSYNC);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Are any table services configured to run inline for both scheduling and execution?
|
* Are any table services configured to run inline for both scheduling and execution?
|
||||||
*
|
*
|
||||||
@@ -2542,6 +2552,11 @@ public class HoodieWriteConfig extends HoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withUseHSync(boolean useHSync) {
|
||||||
|
writeConfig.setValue(USE_HSYNC, String.valueOf(useHSync));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
protected void setDefaults() {
|
protected void setDefaults() {
|
||||||
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
|
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
|
||||||
// Check for mandatory properties
|
// Check for mandatory properties
|
||||||
|
|||||||
@@ -477,7 +477,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
|
|||||||
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
|
.withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
|
||||||
.withRolloverLogWriteToken(writeToken)
|
.withRolloverLogWriteToken(writeToken)
|
||||||
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
|
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
|
||||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||||
|
.withUseHSync(config.getUseHSync()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -716,7 +716,8 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta
|
|||||||
.withFs(dataMetaClient.getFs())
|
.withFs(dataMetaClient.getFs())
|
||||||
.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
|
.withRolloverLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
|
||||||
.withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
|
.withLogWriteToken(HoodieLogFormat.DEFAULT_WRITE_TOKEN)
|
||||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||||
|
.withUseHSync(dataWriteConfig.getUseHSync()).build();
|
||||||
writer.appendBlock(block);
|
writer.appendBlock(block);
|
||||||
writer.close();
|
writer.close();
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|||||||
@@ -129,7 +129,8 @@ public class BaseRollbackHelper implements Serializable {
|
|||||||
.withFileId(fileId)
|
.withFileId(fileId)
|
||||||
.overBaseCommit(latestBaseInstant)
|
.overBaseCommit(latestBaseInstant)
|
||||||
.withFs(metaClient.getFs())
|
.withFs(metaClient.getFs())
|
||||||
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
|
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
|
||||||
|
.withUseHSync(config.getUseHSync()).build();
|
||||||
|
|
||||||
// generate metadata
|
// generate metadata
|
||||||
if (doDelete) {
|
if (doDelete) {
|
||||||
|
|||||||
@@ -144,6 +144,8 @@ public interface HoodieLogFormat {
|
|||||||
// Rollover Log file write token
|
// Rollover Log file write token
|
||||||
private String rolloverLogWriteToken;
|
private String rolloverLogWriteToken;
|
||||||
|
|
||||||
|
private Boolean useHSync;
|
||||||
|
|
||||||
public WriterBuilder withBufferSize(int bufferSize) {
|
public WriterBuilder withBufferSize(int bufferSize) {
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
return this;
|
return this;
|
||||||
@@ -204,6 +206,11 @@ public interface HoodieLogFormat {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public WriterBuilder withUseHSync(boolean useHSync) {
|
||||||
|
this.useHSync = useHSync;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Writer build() throws IOException {
|
public Writer build() throws IOException {
|
||||||
LOG.info("Building HoodieLogFormat Writer");
|
LOG.info("Building HoodieLogFormat Writer");
|
||||||
if (fs == null) {
|
if (fs == null) {
|
||||||
@@ -264,7 +271,10 @@ public interface HoodieLogFormat {
|
|||||||
if (sizeThreshold == null) {
|
if (sizeThreshold == null) {
|
||||||
sizeThreshold = DEFAULT_SIZE_THRESHOLD;
|
sizeThreshold = DEFAULT_SIZE_THRESHOLD;
|
||||||
}
|
}
|
||||||
return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold, rolloverLogWriteToken);
|
if (useHSync == null) {
|
||||||
|
useHSync = true;
|
||||||
|
}
|
||||||
|
return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold, rolloverLogWriteToken, useHSync);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -58,15 +58,18 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
|||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
private transient Thread shutdownThread = null;
|
private transient Thread shutdownThread = null;
|
||||||
|
|
||||||
|
private final boolean useHSync;
|
||||||
|
|
||||||
private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";
|
private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";
|
||||||
|
|
||||||
HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, String rolloverLogWriteToken) {
|
HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize, Short replication, Long sizeThreshold, String rolloverLogWriteToken, Boolean useHSync) {
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.logFile = logFile;
|
this.logFile = logFile;
|
||||||
this.sizeThreshold = sizeThreshold;
|
this.sizeThreshold = sizeThreshold;
|
||||||
this.bufferSize = bufferSize;
|
this.bufferSize = bufferSize;
|
||||||
this.replication = replication;
|
this.replication = replication;
|
||||||
this.rolloverLogWriteToken = rolloverLogWriteToken;
|
this.rolloverLogWriteToken = rolloverLogWriteToken;
|
||||||
|
this.useHSync = useHSync;
|
||||||
addShutDownHook();
|
addShutDownHook();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -258,8 +261,10 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
|
|||||||
output.flush();
|
output.flush();
|
||||||
// NOTE : the following API call makes sure that the data is flushed to disk on DataNodes (akin to POSIX fsync())
|
// NOTE : the following API call makes sure that the data is flushed to disk on DataNodes (akin to POSIX fsync())
|
||||||
// See more details here : https://issues.apache.org/jira/browse/HDFS-744
|
// See more details here : https://issues.apache.org/jira/browse/HDFS-744
|
||||||
|
if (useHSync) {
|
||||||
output.hsync();
|
output.hsync();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getCurrentSize() throws IOException {
|
public long getCurrentSize() throws IOException {
|
||||||
|
|||||||
Reference in New Issue
Block a user