[HUDI-4279] Strength the remote fs view lagging check when latest commit refresh is enabled (#5917)
Signed-off-by: LinMingQiang <1356469429@qq.com>
This commit is contained in:
@@ -116,6 +116,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
|
|||||||
public static final String FILEID_PARAM = "fileid";
|
public static final String FILEID_PARAM = "fileid";
|
||||||
public static final String LAST_INSTANT_TS = "lastinstantts";
|
public static final String LAST_INSTANT_TS = "lastinstantts";
|
||||||
public static final String TIMELINE_HASH = "timelinehash";
|
public static final String TIMELINE_HASH = "timelinehash";
|
||||||
|
public static final String NUM_INSTANTS = "numinstants";
|
||||||
public static final String REFRESH_OFF = "refreshoff";
|
public static final String REFRESH_OFF = "refreshoff";
|
||||||
public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";
|
public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";
|
||||||
|
|
||||||
@@ -162,6 +163,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
|
|||||||
// Adding mandatory parameters - Last instants affecting file-slice
|
// Adding mandatory parameters - Last instants affecting file-slice
|
||||||
timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp()));
|
timeline.lastInstant().ifPresent(instant -> builder.addParameter(LAST_INSTANT_TS, instant.getTimestamp()));
|
||||||
builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
|
builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
|
||||||
|
builder.addParameter(NUM_INSTANTS, timeline.countInstants() + "");
|
||||||
|
|
||||||
String url = builder.toString();
|
String url = builder.toString();
|
||||||
LOG.info("Sending request : (" + url + ")");
|
LOG.info("Sending request : (" + url + ")");
|
||||||
|
|||||||
@@ -121,10 +121,9 @@ public class RequestHandler {
|
|||||||
String lastKnownInstantFromClient =
|
String lastKnownInstantFromClient =
|
||||||
ctx.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, HoodieTimeline.INVALID_INSTANT_TS);
|
ctx.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, HoodieTimeline.INVALID_INSTANT_TS);
|
||||||
String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, "");
|
String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, "");
|
||||||
|
String numInstantsFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.NUM_INSTANTS, "-1");
|
||||||
HoodieTimeline localTimeline =
|
HoodieTimeline localTimeline =
|
||||||
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants();
|
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants();
|
||||||
String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp()
|
|
||||||
: HoodieTimeline.INVALID_INSTANT_TS;
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient
|
LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient
|
||||||
+ "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList()));
|
+ "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList()));
|
||||||
@@ -138,7 +137,8 @@ public class RequestHandler {
|
|||||||
String localTimelineHash = localTimeline.getTimelineHash();
|
String localTimelineHash = localTimeline.getTimelineHash();
|
||||||
// refresh if timeline hash mismatches and if local's last known instant < client's last known instant (if config is enabled)
|
// refresh if timeline hash mismatches and if local's last known instant < client's last known instant (if config is enabled)
|
||||||
if (!localTimelineHash.equals(timelineHashFromClient)
|
if (!localTimelineHash.equals(timelineHashFromClient)
|
||||||
&& (!timelineServiceConfig.refreshTimelineBasedOnLatestCommit || HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient))) {
|
&& (!timelineServiceConfig.refreshTimelineBasedOnLatestCommit
|
||||||
|
|| localTimelineBehind(localTimeline, lastKnownInstantFromClient, numInstantsFromClient))) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -146,6 +146,22 @@ public class RequestHandler {
|
|||||||
return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
|
return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean localTimelineBehind(HoodieTimeline localTimeline, String lastKnownInstantFromClient, String numInstantsFromClient) {
|
||||||
|
String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp()
|
||||||
|
: HoodieTimeline.INVALID_INSTANT_TS;
|
||||||
|
// Why comparing the num commits ?
|
||||||
|
// Assumes there are 4 commits on the timeline:
|
||||||
|
// timestamp(action): ts_0(commit), ts_1(commit), ts_2(clean), ts_3(commit)
|
||||||
|
// when ts_1 is in INFLIGHT state, ts_2 clean action is already finished,
|
||||||
|
// after ts_1 triggers #sync, the local timeline is refreshed as [ts_0, ts_2],
|
||||||
|
// when ts_1 switches state from INFLIGHT to COMPLETED, no #sync triggers.
|
||||||
|
// at ts_3, when the fs view snapshot is requested, the ts_3 client timeline should be [ts_0, ts_1, ts_2],
|
||||||
|
// if we only compare the latest commit, the local timeline is NOT behind, but the fs view is not complete
|
||||||
|
// because ts_1 is lost.
|
||||||
|
return HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient)
|
||||||
|
|| localTimeline.countInstants() < Integer.parseInt(numInstantsFromClient);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Syncs data-set view if local view is behind.
|
* Syncs data-set view if local view is behind.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user