1
0

[HUDI-3576] Configuring timeline refreshes based on latest commit (#4973)

This commit is contained in:
Sivabalan Narayanan
2022-03-07 17:01:49 -05:00
committed by GitHub
parent 53826d69e4
commit 29040762fa
4 changed files with 22 additions and 3 deletions

View File

@@ -78,7 +78,8 @@ public class EmbeddedTimelineService {
.serverPort(writeConfig.getEmbeddedTimelineServerPort()) .serverPort(writeConfig.getEmbeddedTimelineServerPort())
.numThreads(writeConfig.getEmbeddedTimelineServerThreads()) .numThreads(writeConfig.getEmbeddedTimelineServerThreads())
.compress(writeConfig.getEmbeddedTimelineServerCompressOutput()) .compress(writeConfig.getEmbeddedTimelineServerCompressOutput())
.async(writeConfig.getEmbeddedTimelineServerUseAsync()); .async(writeConfig.getEmbeddedTimelineServerUseAsync())
.refreshTimelineBasedOnLatestCommit(writeConfig.isRefreshTimelineServerBasedOnLatestCommit());
// Only passing marker-related write configs to timeline server // Only passing marker-related write configs to timeline server
// if timeline-server-based markers are used. // if timeline-server-based markers are used.
if (writeConfig.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED) { if (writeConfig.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED) {

View File

@@ -338,6 +338,11 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. " .withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. "
+ "Controls whether or not, the write should be failed as well, if such archiving fails."); + "Controls whether or not, the write should be failed as well, if such archiving fails.");
public static final ConfigProperty<Boolean> REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT = ConfigProperty
.key("hoodie.refresh.timeline.server.based.on.latest.commit")
.defaultValue(false)
.withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (false), ");
public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
.key("hoodie.consistency.check.initial_interval_ms") .key("hoodie.consistency.check.initial_interval_ms")
.defaultValue(2000L) .defaultValue(2000L)
@@ -1038,6 +1043,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLE); return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLE);
} }
public boolean isRefreshTimelineServerBasedOnLatestCommit() {
return getBoolean(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT);
}
public int getMaxConsistencyChecks() { public int getMaxConsistencyChecks() {
return getInt(MAX_CONSISTENCY_CHECKS); return getInt(MAX_CONSISTENCY_CHECKS);
} }

View File

@@ -136,9 +136,9 @@ public class RequestHandler {
} }
String localTimelineHash = localTimeline.getTimelineHash(); String localTimelineHash = localTimeline.getTimelineHash();
// refresh if timeline hash mismatches and if local's last known instant < client's last known instant // refresh if timeline hash mismatches and if local's last known instant < client's last known instant (if config is enabled)
if (!localTimelineHash.equals(timelineHashFromClient) if (!localTimelineHash.equals(timelineHashFromClient)
&& HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient)) { && (!timelineServiceConfig.refreshTimelineBasedOnLatestCommit || HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient))) {
return true; return true;
} }

View File

@@ -123,6 +123,9 @@ public class TimelineService {
@Parameter(names = {"--marker-parallelism", "-mdp"}, description = "Parallelism to use for reading and deleting marker files") @Parameter(names = {"--marker-parallelism", "-mdp"}, description = "Parallelism to use for reading and deleting marker files")
public int markerParallelism = 100; public int markerParallelism = 100;
@Parameter(names = {"--refreshTimelineBasedOnLatestCommit"}, description = "Refresh local timeline based on latest commit in addition to timeline hash value")
public boolean refreshTimelineBasedOnLatestCommit = true;
@Parameter(names = {"--help", "-h"}) @Parameter(names = {"--help", "-h"})
public Boolean help = false; public Boolean help = false;
@@ -147,6 +150,7 @@ public class TimelineService {
private int markerBatchNumThreads = 20; private int markerBatchNumThreads = 20;
private long markerBatchIntervalMs = 50L; private long markerBatchIntervalMs = 50L;
private int markerParallelism = 100; private int markerParallelism = 100;
private boolean refreshTimelineBasedOnLatestCommit = false;
public Builder() { public Builder() {
} }
@@ -196,6 +200,11 @@ public class TimelineService {
return this; return this;
} }
public Builder refreshTimelineBasedOnLatestCommit(boolean refreshTimelineBasedOnLatestCommit) {
this.refreshTimelineBasedOnLatestCommit = refreshTimelineBasedOnLatestCommit;
return this;
}
public Builder enableMarkerRequests(boolean enableMarkerRequests) { public Builder enableMarkerRequests(boolean enableMarkerRequests) {
this.enableMarkerRequests = enableMarkerRequests; this.enableMarkerRequests = enableMarkerRequests;
return this; return this;