1
0

[HUDI-2761] Fixing timeline server for repeated refreshes (#4812)

* Fixing timeline server for repeated refreshes
This commit is contained in:
Sivabalan Narayanan
2022-03-04 21:04:16 -05:00
committed by GitHub
parent 0986d5a01d
commit 6a46130037
3 changed files with 12 additions and 16 deletions

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.table.functional;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -150,11 +149,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception {
// NOTE: First writer will have Metadata table DISABLED
HoodieWriteConfig.Builder cfgBuilder =
getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE)
.withMetadataConfig(
HoodieMetadataConfig.newBuilder()
.enable(false)
.build());
getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE);
addConfigsForPopulateMetaFields(cfgBuilder, true);
HoodieWriteConfig cfg = cfgBuilder.build();
@@ -209,7 +204,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
final String commitTime1 = "002";
// WriteClient with custom config (disable small file handling)
// NOTE: Second writer will have Metadata table ENABLED
try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) {
try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) {
secondClient.startCommitWithTime(commitTime1);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -245,8 +240,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
/*
* Write 3 (inserts + updates - testing successful delta commit)
*/
final String commitTime2 = "002";
try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(cfg);) {
final String commitTime2 = "003";
try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) {
thirdClient.startCommitWithTime(commitTime2);
List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
@@ -287,7 +282,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction
/*
* Write 4 (updates)
*/
newCommitTime = "003";
newCommitTime = "004";
thirdClient.startCommitWithTime(newCommitTime);
writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime);

View File

@@ -23,7 +23,6 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
@@ -337,11 +336,9 @@ public class StreamWriteOperatorCoordinator
}
private void startInstant() {
final String instant = HoodieActiveTimeline.createNewInstantTime();
// put the assignment in front of metadata generation,
// because the instant request from write task is asynchronous.
this.instant = instant;
this.writeClient.startCommitWithTime(instant, tableState.commitAction);
this.instant = this.writeClient.startCommit();
this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant);
LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant,
this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE));

View File

@@ -123,18 +123,22 @@ public class RequestHandler {
String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, "");
HoodieTimeline localTimeline =
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants();
String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp()
: HoodieTimeline.INVALID_INSTANT_TS;
if (LOG.isDebugEnabled()) {
LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient
+ "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList()));
}
if ((localTimeline.getInstants().count() == 0)
if ((!localTimeline.getInstants().findAny().isPresent())
&& HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
return false;
}
String localTimelineHash = localTimeline.getTimelineHash();
if (!localTimelineHash.equals(timelineHashFromClient)) {
// refresh if timeline hash mismatches and if local's last known instant < client's last known instant
if (!localTimelineHash.equals(timelineHashFromClient)
&& HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient)) {
return true;
}