From 435ea1543c034194d7ca0b589b7b043fc49c07ac Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 24 Nov 2021 18:26:40 -0500 Subject: [PATCH] [HUDI-2793] Fixing deltastreamer checkpoint fetch/copy over (#4034) - Removed the copy over logic in transaction utils. Deltastreamer will go back to previous commits and get the checkpoint value. --- .../hudi/client/utils/TransactionUtils.java | 52 +++------- .../apache/hudi/config/HoodieWriteConfig.java | 12 --- .../hudi/utils/TestTransactionUtils.java | 98 ------------------- .../utilities/deltastreamer/DeltaSync.java | 36 ++++--- .../HoodieDeltaStreamerTestBase.java | 26 +++++ .../functional/TestHoodieDeltaStreamer.java | 47 +++++++++ ...estHoodieDeltaStreamerWithMultiWriter.java | 51 +++++----- 7 files changed, 134 insertions(+), 188 deletions(-) delete mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestTransactionUtils.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 8e59478da..ed2ea4577 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -18,11 +18,8 @@ package org.apache.hudi.client.utils; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.hudi.client.transaction.ConflictResolutionStrategy; import org.apache.hudi.client.transaction.ConcurrentOperation; +import org.apache.hudi.client.transaction.ConflictResolutionStrategy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -34,9 +31,12 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.table.HoodieTable; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; + import java.io.IOException; +import java.util.Map; import java.util.stream.Stream; public class TransactionUtils { @@ -45,6 +45,7 @@ public class TransactionUtils { /** * Resolve any write conflicts when committing data. + * * @param table * @param currentTxnOwnerInstant * @param thisCommitMetadata @@ -54,11 +55,11 @@ public class TransactionUtils { * @throws HoodieWriteConflictException */ public static Option resolveWriteConflictIfAny( - final HoodieTable table, - final Option currentTxnOwnerInstant, - final Option thisCommitMetadata, - final HoodieWriteConfig config, - Option lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException { + final HoodieTable table, + final Option currentTxnOwnerInstant, + final Option thisCommitMetadata, + final HoodieWriteConfig config, + Option lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException { if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) { ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy(); Stream instantStream = resolutionStrategy.getCandidateInstants(table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant); @@ -68,7 +69,7 @@ public class TransactionUtils { ConcurrentOperation otherOperation = new ConcurrentOperation(instant, table.getMetaClient()); if (resolutionStrategy.hasConflict(thisOperation, otherOperation)) { LOG.info("Conflict encountered between current instant = " + thisOperation + " and instant = " - + otherOperation + ", attempting to resolve it..."); + + otherOperation + ", attempting to resolve it..."); resolutionStrategy.resolveConflict(table, thisOperation, otherOperation); } } catch (IOException io) { @@ -77,10 +78,6 @@ public class TransactionUtils { }); LOG.info("Successfully resolved conflicts, if any"); - if (config.mergeDeltastreamerStateFromPreviousCommit()) { - mergeCheckpointStateFromPreviousCommit(table.getMetaClient(), thisOperation.getCommitMetadataOption()); - } - return thisOperation.getCommitMetadataOption(); } return thisCommitMetadata; @@ -88,6 +85,7 @@ public class TransactionUtils { /** * Get the last completed transaction hoodie instant and {@link HoodieCommitMetadata#getExtraMetadata()}. + * * @param metaClient * @return */ @@ -117,30 +115,4 @@ public class TransactionUtils { throw new HoodieIOException("Unable to read metadata for instant " + hoodieInstantOption.get(), io); } } - - protected static void mergeCheckpointStateFromPreviousCommit(HoodieTableMetaClient metaClient, Option thisMetadata) { - overrideWithLatestCommitMetadata(metaClient, thisMetadata, Collections.singletonList(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)); - } - - /** - * Generic method allowing us to override the current metadata with the metadata from - * the latest instant for the specified key prefixes. - * @param metaClient - * @param thisMetadata - * @param keyPrefixes The key prefixes to merge from the previous commit - */ - private static void overrideWithLatestCommitMetadata(HoodieTableMetaClient metaClient, - Option thisMetadata, - List keyPrefixes) { - if (keyPrefixes.size() == 1 && keyPrefixes.get(0).length() < 1) { - return; - } - Option>> lastInstant = getLastCompletedTxnInstantAndMetadata(metaClient); - if (lastInstant.isPresent() && thisMetadata.isPresent()) { - Stream lastCommitMetadataKeys = lastInstant.get().getRight().keySet().stream(); - keyPrefixes.stream().forEach(keyPrefix -> lastCommitMetadataKeys - .filter(key -> key.startsWith(keyPrefix)) - .forEach(key -> thisMetadata.get().getExtraMetadata().put(key, lastInstant.get().getRight().get(key)))); - } - } } \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 17386e9e9..6652f5e0a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -378,14 +378,6 @@ public class HoodieWriteConfig extends HoodieConfig { + "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on the table and exactly one of them succeed " + "if a conflict (writes affect the same file group) is detected."); - public static final ConfigProperty WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE = ConfigProperty - .key("hoodie.write.concurrency.merge.deltastreamer.state") - .defaultValue(false) - .withAlternatives("hoodie.write.meta.key.prefixes") - .withDocumentation("If enabled, this writer will merge Deltastreamer state from the previous checkpoint in order to allow both realtime " - + "and batch writers to ingest into a single table. This should not be enabled on Deltastreamer writers. Enabling this config means," - + "for a spark writer, deltastreamer checkpoint will be copied over from previous commit to the current one."); - /** * Currently the use this to specify the write schema. */ @@ -1778,10 +1770,6 @@ public class HoodieWriteConfig extends HoodieConfig { return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE)); } - public Boolean mergeDeltastreamerStateFromPreviousCommit() { - return getBoolean(HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE); - } - public Boolean inlineTableServices() { return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestTransactionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestTransactionUtils.java deleted file mode 100644 index 6a8369743..000000000 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestTransactionUtils.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hudi.utils; - -import org.apache.hadoop.fs.Path; -import org.apache.hudi.client.utils.TransactionUtils; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.testutils.HoodieCommonTestHarness; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.config.HoodieWriteConfig; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; - -public class TestTransactionUtils extends HoodieCommonTestHarness { - - @BeforeEach - public void setUp() throws Exception { - init(); - } - - public void init() throws Exception { - initPath(); - initMetaClient(); - metaClient.getFs().mkdirs(new Path(basePath)); - } - - @Test - public void testCheckpointStateMerge() throws IOException { - HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient); - - // Create completed commit with deltastreamer checkpoint state - HoodieInstant commitInstantWithCheckpointState = new HoodieInstant( - true, - HoodieTimeline.COMMIT_ACTION, - HoodieActiveTimeline.createNewInstantTime() - ); - timeline.createNewInstant(commitInstantWithCheckpointState); - - HoodieCommitMetadata metadataWithCheckpoint = new HoodieCommitMetadata(); - String checkpointVal = "00001"; - metadataWithCheckpoint.addMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, checkpointVal); - timeline.saveAsComplete( - commitInstantWithCheckpointState, - Option.of(metadataWithCheckpoint.toJsonString().getBytes(StandardCharsets.UTF_8)) - ); - - // Inflight commit without checkpoint metadata - HoodieInstant commitInstantWithoutCheckpointState = new HoodieInstant( - true, - HoodieTimeline.COMMIT_ACTION, - HoodieActiveTimeline.createNewInstantTime() - ); - timeline.createNewInstant(commitInstantWithoutCheckpointState); - HoodieCommitMetadata metadataWithoutCheckpoint = new HoodieCommitMetadata(); - - // Ensure that checkpoint state is merged in from previous completed commit - MockTransactionUtils.assertCheckpointStateWasMerged(metaClient, metadataWithoutCheckpoint, checkpointVal); - } - - private static class MockTransactionUtils extends TransactionUtils { - - public static void assertCheckpointStateWasMerged( - HoodieTableMetaClient metaClient, - HoodieCommitMetadata currentMetadata, - String expectedCheckpointState) { - TransactionUtils.mergeCheckpointStateFromPreviousCommit(metaClient, Option.of(currentMetadata)); - assertEquals( - expectedCheckpointState, - currentMetadata.getExtraMetadata().get(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY) - ); - } - } -} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index b5239e929..e94c1e1eb 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -50,6 +50,7 @@ import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodiePayloadConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; import org.apache.hudi.keygen.KeyGenerator; @@ -106,7 +107,6 @@ import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT; import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT; import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT; -import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY; import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE; @@ -339,11 +339,17 @@ public class DeltaSync implements Serializable { resumeCheckpointStr = Option.empty(); } else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) { - throw new HoodieDeltaStreamerException( - "Unable to find previous checkpoint. Please double check if this table " - + "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" - + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" - + commitMetadata.toJsonString()); + // if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one. + Option prevCheckpoint = getPreviousCheckpoint(commitTimelineOpt.get()); + if (prevCheckpoint.isPresent()) { + resumeCheckpointStr = prevCheckpoint; + } else { + throw new HoodieDeltaStreamerException( + "Unable to find previous checkpoint. Please double check if this table " + + "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :" + + commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata=" + + commitMetadata.toJsonString()); + } } // KAFKA_CHECKPOINT_TYPE will be honored only for first batch. if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) { @@ -451,6 +457,18 @@ public class DeltaSync implements Serializable { return Pair.of(schemaProvider, Pair.of(checkpointStr, records)); } + protected Option getPreviousCheckpoint(HoodieTimeline timeline) throws IOException { + return timeline.getReverseOrderedInstants().map(instant -> { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + return Option.ofNullable(commitMetadata.getMetadata(CHECKPOINT_KEY)); + } catch (IOException e) { + throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e); + } + }).filter(Option::isPresent).findFirst().orElse(Option.empty()); + } + /** * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed. * @@ -716,12 +734,6 @@ public class DeltaSync implements Serializable { String.format("%s should be set to %s", COMBINE_BEFORE_INSERT.key(), cfg.filterDupes)); ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(), String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT.key(), combineBeforeUpsert)); - ValidationUtils.checkArgument(!config.mergeDeltastreamerStateFromPreviousCommit(), - String.format( - "Deltastreamer processes should not merge state from previous deltastreamer commits. Please unset '%s'", - WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key()) - ); - return config; } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java index 043b0a4e0..06898db92 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/HoodieDeltaStreamerTestBase.java @@ -20,7 +20,14 @@ package org.apache.hudi.utilities.functional; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.testutils.UtilitiesTestBase; @@ -35,6 +42,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Map; import java.util.Random; public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { @@ -277,4 +287,20 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase { } } + static void addCommitToTimeline(HoodieTableMetaClient metaCient) throws IOException { + addCommitToTimeline(metaCient, Collections.emptyMap()); + } + + static void addCommitToTimeline(HoodieTableMetaClient metaCient, Map extraMetadata) throws IOException { + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + commitMetadata.setOperationType(WriteOperationType.UPSERT); + extraMetadata.forEach((k,v) -> commitMetadata.getExtraMetadata().put(k, v)); + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, commitTime)); + metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime)); + metaCient.getActiveTimeline().saveAsComplete( + new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime), + Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + } + } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 227623eeb..30025405c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.functional; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.DFSPropertiesConfiguration; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; @@ -37,10 +38,12 @@ import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.HiveSyncConfig; @@ -48,8 +51,10 @@ import org.apache.hudi.hive.HoodieHiveClient; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; +import org.apache.hudi.utilities.deltastreamer.DeltaSync; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.schema.SparkAvroPostProcessor; import org.apache.hudi.utilities.sources.CsvDFSSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; @@ -68,6 +73,7 @@ import org.apache.hudi.utilities.transform.Transformer; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; @@ -102,6 +108,7 @@ import java.sql.DriverManager; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -1691,6 +1698,46 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2); } + @Test + public void testFetchingCheckpointFromPreviousCommits() throws IOException { + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dfsBasePath + "/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT); + + TypedProperties properties = new TypedProperties(); + properties.setProperty("hoodie.datasource.write.recordkey.field","key"); + properties.setProperty("hoodie.datasource.write.partitionpath.field","pp"); + TestDeltaSync testDeltaSync = new TestDeltaSync(cfg, sparkSession, null, properties, + jsc, dfs, jsc.hadoopConfiguration(), null); + + properties.put(HoodieTableConfig.NAME.key(), "sample_tbl"); + HoodieTableMetaClient metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), dfsBasePath, HoodieTableType.COPY_ON_WRITE, properties); + + Map extraMetadata = new HashMap<>(); + extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "abc"); + addCommitToTimeline(metaClient, extraMetadata); + metaClient.reloadActiveTimeline(); + assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "abc"); + + addCommitToTimeline(metaClient, Collections.emptyMap()); + metaClient.reloadActiveTimeline(); + extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "def"); + addCommitToTimeline(metaClient, extraMetadata); + metaClient.reloadActiveTimeline(); + assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "def"); + } + + class TestDeltaSync extends DeltaSync { + + public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, + JavaSparkContext jssc, FileSystem fs, Configuration conf, + Function onInitializingHoodieWriteClient) throws IOException { + super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, onInitializingHoodieWriteClient); + } + + protected Option getPreviousCheckpoint(HoodieTimeline timeline) throws IOException { + return super.getPreviousCheckpoint(timeline); + } + } + /** * UDF to calculate Haversine distance. */ diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index e67dfcca1..27502bfdf 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -64,6 +64,7 @@ import static org.apache.hudi.config.HoodieWriteConfig.INSERT_PARALLELISM_VALUE; import static org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE; import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER; +import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.addCommitToTimeline; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName; import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs; import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner; @@ -155,7 +156,11 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona @ParameterizedTest @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"}) - void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception { + public void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception { + testCheckpointCarryOver(tableType); + } + + private void testCheckpointCarryOver(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); @@ -196,34 +201,28 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc()); backfillJob.sync(); - // Save the checkpoint information from the deltastreamer run and perform next write - String checkpointAfterDeltaSync = getLatestMetadata(meta).getMetadata(CHECKPOINT_KEY); - // this writer will enable HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key() so that deltastreamer checkpoint will be carried over. - performWriteWithDeltastreamerStateMerge(); + meta.reloadActiveTimeline(); + int totalCommits = meta.getCommitsTimeline().filterCompletedInstants().countInstants(); - // Verify that the checkpoint is carried over - HoodieCommitMetadata commitMetaAfterDatasourceWrite = getLatestMetadata(meta); - Assertions.assertEquals(checkpointAfterDeltaSync, commitMetaAfterDatasourceWrite.getMetadata(CHECKPOINT_KEY)); + // add a new commit to timeline which may not have the checkpoint in extra metadata + addCommitToTimeline(meta); + meta.reloadActiveTimeline(); + verifyCommitMetadataCheckpoint(meta, null); + + cfgBackfillJob.checkpoint = null; + new HoodieDeltaStreamer(cfgBackfillJob, jsc()).sync(); // if deltastreamer checkpoint fetch does not walk back to older commits, this sync will fail + meta.reloadActiveTimeline(); + Assertions.assertEquals(totalCommits + 2, meta.getCommitsTimeline().filterCompletedInstants().countInstants()); + verifyCommitMetadataCheckpoint(meta, "00008"); } - /** - * Performs a hudi datasource write with deltastreamer state merge enabled. - */ - private void performWriteWithDeltastreamerStateMerge() { - spark().read() - .format("hudi") - .load(tableBasePath + "/*/*.parquet") - .limit(1) - .write() - .format("hudi") - .option(HoodieWriteConfig.TBL_NAME.key(), COW_TEST_TABLE_NAME) - .option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL()) - .option(DataSourceWriteOptions.INSERT_DROP_DUPS().key(), "true") - .option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key") - .option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp") - .option(HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key(), "true") - .mode(SaveMode.Append) - .save(tableBasePath + "/*/*.parquet"); + private void verifyCommitMetadataCheckpoint(HoodieTableMetaClient metaClient, String expectedCheckpoint) throws IOException { + HoodieCommitMetadata commitMeta = getLatestMetadata(metaClient); + if (expectedCheckpoint == null) { + Assertions.assertNull(commitMeta.getMetadata(CHECKPOINT_KEY)); + } else { + Assertions.assertEquals(expectedCheckpoint, commitMeta.getMetadata(CHECKPOINT_KEY)); + } } private static HoodieCommitMetadata getLatestMetadata(HoodieTableMetaClient meta) throws IOException {