diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java index d55295503..7655cf93f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java @@ -21,7 +21,6 @@ package org.apache.hudi.table.functional; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -150,11 +149,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction void testRollbackWithDeltaAndCompactionCommit(boolean rollbackUsingMarkers) throws Exception { // NOTE: First writer will have Metadata table DISABLED HoodieWriteConfig.Builder cfgBuilder = - getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE) - .withMetadataConfig( - HoodieMetadataConfig.newBuilder() - .enable(false) - .build()); + getConfigBuilder(false, rollbackUsingMarkers, HoodieIndex.IndexType.SIMPLE); addConfigsForPopulateMetaFields(cfgBuilder, true); HoodieWriteConfig cfg = cfgBuilder.build(); @@ -209,7 +204,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction final String commitTime1 = "002"; // WriteClient with custom config (disable small file handling) // NOTE: Second writer will have Metadata table ENABLED - try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(false));) { + try (SparkRDDWriteClient secondClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) { secondClient.startCommitWithTime(commitTime1); List copyOfRecords = new ArrayList<>(records); @@ -245,8 +240,8 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction /* * Write 3 (inserts + updates - testing successful delta commit) */ - final String commitTime2 = "002"; - try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(cfg);) { + final String commitTime2 = "003"; + try (SparkRDDWriteClient thirdClient = getHoodieWriteClient(getHoodieWriteConfigWithSmallFileHandlingOff(true));) { thirdClient.startCommitWithTime(commitTime2); List copyOfRecords = new ArrayList<>(records); @@ -287,7 +282,7 @@ public class TestHoodieSparkMergeOnReadTableRollback extends SparkClientFunction /* * Write 4 (updates) */ - newCommitTime = "003"; + newCommitTime = "004"; thirdClient.startCommitWithTime(newCommitTime); writeStatusJavaRDD = thirdClient.upsert(writeRecords, newCommitTime); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 4782070e3..f3253e48a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -23,7 +23,6 @@ import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; @@ -337,11 +336,9 @@ public class StreamWriteOperatorCoordinator } private void startInstant() { - final String instant = HoodieActiveTimeline.createNewInstantTime(); // put the assignment in front of metadata generation, // because the instant request from write task is asynchronous. - this.instant = instant; - this.writeClient.startCommitWithTime(instant, tableState.commitAction); + this.instant = this.writeClient.startCommit(); this.metaClient.getActiveTimeline().transitionRequestedToInflight(tableState.commitAction, this.instant); LOG.info("Create instant [{}] for table [{}] with type [{}]", this.instant, this.conf.getString(FlinkOptions.TABLE_NAME), conf.getString(FlinkOptions.TABLE_TYPE)); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 4744fbb6b..602ab16a7 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -123,18 +123,22 @@ public class RequestHandler { String timelineHashFromClient = ctx.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, ""); HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline().filterCompletedAndCompactionInstants(); + String localLastKnownInstant = localTimeline.lastInstant().isPresent() ? localTimeline.lastInstant().get().getTimestamp() + : HoodieTimeline.INVALID_INSTANT_TS; if (LOG.isDebugEnabled()) { LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient + "], localTimeline=" + localTimeline.getInstants().collect(Collectors.toList())); } - if ((localTimeline.getInstants().count() == 0) + if ((!localTimeline.getInstants().findAny().isPresent()) && HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) { return false; } String localTimelineHash = localTimeline.getTimelineHash(); - if (!localTimelineHash.equals(timelineHashFromClient)) { + // refresh if timeline hash mismatches and if local's last known instant < client's last known instant + if (!localTimelineHash.equals(timelineHashFromClient) + && HoodieTimeline.compareTimestamps(localLastKnownInstant, HoodieTimeline.LESSER_THAN, lastKnownInstantFromClient)) { return true; }