1
0

[HUDI-4448] Remove the latest commit refresh for timeline server (#6179)

This commit is contained in:
Danny Chan
2022-07-24 07:10:53 +08:00
committed by GitHub
parent 2d745057ea
commit a0ffd05b77
6 changed files with 4 additions and 56 deletions

View File

@@ -78,8 +78,7 @@ public class EmbeddedTimelineService {
.serverPort(writeConfig.getEmbeddedTimelineServerPort())
.numThreads(writeConfig.getEmbeddedTimelineServerThreads())
.compress(writeConfig.getEmbeddedTimelineServerCompressOutput())
.async(writeConfig.getEmbeddedTimelineServerUseAsync())
.refreshTimelineBasedOnLatestCommit(writeConfig.isRefreshTimelineServerBasedOnLatestCommit());
.async(writeConfig.getEmbeddedTimelineServerUseAsync());
// Only passing marker-related write configs to timeline server
// if timeline-server-based markers are used.
if (writeConfig.getMarkersType() == MarkerType.TIMELINE_SERVER_BASED) {

View File

@@ -359,11 +359,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Timeline archiving removes older instants from the timeline, after each write operation, to minimize metadata overhead. "
+ "Controls whether or not, the write should be failed as well, if such archiving fails.");
public static final ConfigProperty<Boolean> REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT = ConfigProperty
.key("hoodie.refresh.timeline.server.based.on.latest.commit")
.defaultValue(true)
.withDocumentation("Refresh timeline in timeline server based on latest commit apart from timeline hash difference. By default (true).");
public static final ConfigProperty<Long> INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
.key("hoodie.consistency.check.initial_interval_ms")
.defaultValue(2000L)
@@ -1105,10 +1100,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(FAIL_ON_TIMELINE_ARCHIVING_ENABLE);
}
public boolean isRefreshTimelineServerBasedOnLatestCommit() {
return getBoolean(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT);
}
public int getMaxConsistencyChecks() {
return getInt(MAX_CONSISTENCY_CHECKS);
}
@@ -2514,11 +2505,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
public Builder withRefreshTimelineServerBasedOnLatestCommit(boolean refreshTimelineServerBasedOnLatestCommit) {
writeConfig.setValue(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT, Boolean.toString(refreshTimelineServerBasedOnLatestCommit));
return this;
}
protected void setDefaults() {
writeConfig.setDefaultValue(MARKERS_TYPE, getDefaultMarkersType(engineType));
// Check for mandatory properties

View File

@@ -515,13 +515,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {
return getConfigBuilder(schema)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withAvroSchemaValidate(true)
// The test has rollback instants on the timeline,
// these rollback instants use real time as instant time, whose instant time is always greater than
// the normal commits instant time, this breaks the refresh rule introduced in HUDI-2761:
// The last client instant is always the rollback instant but not the normal commit.
// Always refresh the timeline when client and server have different timeline.
.withRefreshTimelineServerBasedOnLatestCommit(false);
.withAvroSchemaValidate(true);
}
@Override

View File

@@ -116,7 +116,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
public static final String FILEID_PARAM = "fileid";
public static final String LAST_INSTANT_TS = "lastinstantts";
public static final String TIMELINE_HASH = "timelinehash";
public static final String NUM_INSTANTS = "numinstants";
public static final String REFRESH_OFF = "refreshoff";
public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";
@@ -163,7 +162,6 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
// Adding mandatory parameters - Last instants affecting file-slice
timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp()));
builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
builder.addParameter(NUM_INSTANTS, timeline.countInstants() + "");
String url = builder.toString();
LOG.info("Sending request : (" + url + ")");

View File

@@ -121,7 +121,6 @@ public class RequestHandler {
String lastKnownInstantFromClient =
ctx.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, HoodieTimeline.INVALID_INSTANT_TS);
String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, "");
String numInstantsFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.NUM_INSTANTS, "-1");
HoodieTimeline localTimeline =
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants();
if (LOG.isDebugEnabled()) {
@@ -135,10 +134,8 @@ public class RequestHandler {
}
String localTimelineHash = localTimeline.getTimelineHash();
// refresh if timeline hash mismatches and if local's last known instant < client's last known instant (if config is enabled)
if (!localTimelineHash.equals(timelineHashFromClient)
&& (!timelineServiceConfig.refreshTimelineBasedOnLatestCommit
|| localTimelineBehind(localTimeline, lastKnownInstantFromClient, numInstantsFromClient))) {
// refresh if timeline hash mismatches
if (!localTimelineHash.equals(timelineHashFromClient)) {
return true;
}
@@ -146,22 +143,6 @@ public class RequestHandler {
return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
}
private static boolean localTimelineBehind(HoodieTimeline localTimeline, String lastKnownInstantFromClient, String numInstantsFromClient) {
String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp()
: HoodieTimeline.INVALID_INSTANT_TS;
// Why comparing the num commits ?
// Assumes there are 4 commits on the timeline:
// timestamp(action): ts_0(commit), ts_1(commit), ts_2(clean), ts_3(commit)
// when ts_1 is in INFLIGHT state, ts_2 clean action is already finished,
// after ts_1 triggers #sync, the local timeline is refreshed as [ts_0, ts_2],
// when ts_1 switches state from INFLIGHT to COMPLETED, no #sync triggers.
// at ts_3, when the fs view snapshot is requested, the ts_3 client timeline should be [ts_0, ts_1, ts_2],
// if we only compare the latest commit, the local timeline is NOT behind, but the fs view is not complete
// because ts_1 is lost.
return HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient)
|| localTimeline.countInstants() < Integer.parseInt(numInstantsFromClient);
}
/**
* Syncs data-set view if local view is behind.
*/

View File

@@ -123,9 +123,6 @@ public class TimelineService {
@Parameter(names = {"--marker-parallelism", "-mdp"}, description = "Parallelism to use for reading and deleting marker files")
public int markerParallelism = 100;
@Parameter(names = {"--refreshTimelineBasedOnLatestCommit"}, description = "Refresh local timeline based on latest commit in addition to timeline hash value")
public boolean refreshTimelineBasedOnLatestCommit = true;
@Parameter(names = {"--help", "-h"})
public Boolean help = false;
@@ -150,7 +147,6 @@ public class TimelineService {
private int markerBatchNumThreads = 20;
private long markerBatchIntervalMs = 50L;
private int markerParallelism = 100;
private boolean refreshTimelineBasedOnLatestCommit = true;
public Builder() {
}
@@ -200,11 +196,6 @@ public class TimelineService {
return this;
}
public Builder refreshTimelineBasedOnLatestCommit(boolean refreshTimelineBasedOnLatestCommit) {
this.refreshTimelineBasedOnLatestCommit = refreshTimelineBasedOnLatestCommit;
return this;
}
public Builder enableMarkerRequests(boolean enableMarkerRequests) {
this.enableMarkerRequests = enableMarkerRequests;
return this;
@@ -240,7 +231,6 @@ public class TimelineService {
config.markerBatchNumThreads = this.markerBatchNumThreads;
config.markerBatchIntervalMs = this.markerBatchIntervalMs;
config.markerParallelism = this.markerParallelism;
config.refreshTimelineBasedOnLatestCommit = this.refreshTimelineBasedOnLatestCommit;
return config;
}
}