1
0

[HUDI-2579] Make deltastreamer checkpoint state merging more explicit (#3820)

Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
davehagman
2021-11-09 17:37:59 -05:00
committed by GitHub
parent 2f95967dfe
commit dfe3b84715
6 changed files with 195 additions and 52 deletions

View File

@@ -18,7 +18,7 @@
package org.apache.hudi.client.utils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
@@ -53,8 +53,12 @@ public class TransactionUtils {
* @return
* @throws HoodieWriteConflictException
*/
public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(final HoodieTable table, final Option<HoodieInstant> currentTxnOwnerInstant,
final Option<HoodieCommitMetadata> thisCommitMetadata, final HoodieWriteConfig config, Option<HoodieInstant> lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException {
public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
final HoodieTable table,
final Option<HoodieInstant> currentTxnOwnerInstant,
final Option<HoodieCommitMetadata> thisCommitMetadata,
final HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant) throws HoodieWriteConflictException {
if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy();
Stream<HoodieInstant> instantStream = resolutionStrategy.getCandidateInstants(table.getActiveTimeline(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant);
@@ -72,8 +76,11 @@ public class TransactionUtils {
}
});
LOG.info("Successfully resolved conflicts, if any");
// carry over necessary metadata from latest commit metadata
overrideWithLatestCommitMetadata(table.getMetaClient(), thisOperation.getCommitMetadataOption(), currentTxnOwnerInstant, Arrays.asList(config.getWriteMetaKeyPrefixes().split(",")));
if (config.mergeDeltastreamerStateFromPreviousCommit()) {
mergeCheckpointStateFromPreviousCommit(table.getMetaClient(), thisOperation.getCommitMetadataOption());
}
return thisOperation.getCommitMetadataOption();
}
return thisCommitMetadata;
@@ -111,16 +118,27 @@ public class TransactionUtils {
}
}
// override the current metadata with the metadata from the latest instant for the specified key prefixes
private static void overrideWithLatestCommitMetadata(HoodieTableMetaClient metaClient, Option<HoodieCommitMetadata> thisMetadata,
Option<HoodieInstant> thisInstant, List<String> keyPrefixes) {
protected static void mergeCheckpointStateFromPreviousCommit(HoodieTableMetaClient metaClient, Option<HoodieCommitMetadata> thisMetadata) {
overrideWithLatestCommitMetadata(metaClient, thisMetadata, Collections.singletonList(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY));
}
/**
* Generic method allowing us to override the current metadata with the metadata from
* the latest instant for the specified key prefixes.
* @param metaClient
* @param thisMetadata
* @param keyPrefixes The key prefixes to merge from the previous commit
*/
private static void overrideWithLatestCommitMetadata(HoodieTableMetaClient metaClient,
Option<HoodieCommitMetadata> thisMetadata,
List<String> keyPrefixes) {
if (keyPrefixes.size() == 1 && keyPrefixes.get(0).length() < 1) {
return;
}
Option<Pair<HoodieInstant, Map<String, String>>> lastInstant = getLastCompletedTxnInstantAndMetadata(metaClient);
if (lastInstant.isPresent() && thisMetadata.isPresent()) {
Stream<String> keys = thisMetadata.get().getExtraMetadata().keySet().stream();
keyPrefixes.stream().forEach(keyPrefix -> keys
Stream<String> lastCommitMetadataKeys = lastInstant.get().getRight().keySet().stream();
keyPrefixes.stream().forEach(keyPrefix -> lastCommitMetadataKeys
.filter(key -> key.startsWith(keyPrefix))
.forEach(key -> thisMetadata.get().getExtraMetadata().put(key, lastInstant.get().getRight().get(key))));
}

View File

@@ -85,6 +85,10 @@ public class HoodieWriteConfig extends HoodieConfig {
private static final long serialVersionUID = 0L;
// This is a constant as is should never be changed via config (will invalidate previous commits)
// It is here so that both the client and deltastreamer use the same reference
public static final String DELTASTREAMER_CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
public static final ConfigProperty<String> TBL_NAME = ConfigProperty
.key("hoodie.table.name")
.noDefaultValue()
@@ -368,11 +372,13 @@ public class HoodieWriteConfig extends HoodieConfig {
+ "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on the table and exactly one of them succeed "
+ "if a conflict (writes affect the same file group) is detected.");
public static final ConfigProperty<String> WRITE_META_KEY_PREFIXES = ConfigProperty
.key("hoodie.write.meta.key.prefixes")
.defaultValue("")
.withDocumentation("Comma separated metadata key prefixes to override from latest commit "
+ "during overlapping commits via multi writing");
public static final ConfigProperty<Boolean> WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE = ConfigProperty
.key("hoodie.write.concurrency.merge.deltastreamer.state")
.defaultValue(false)
.withAlternatives("hoodie.write.meta.key.prefixes")
.withDocumentation("If enabled, this writer will merge Deltastreamer state from the previous checkpoint in order to allow both realtime "
+ "and batch writers to ingest into a single table. This should not be enabled on Deltastreamer writers. Enabling this config means,"
+ "for a spark writer, deltastreamer checkpoint will be copied over from previous commit to the current one.");
/**
* Currently the use this to specify the write schema.
@@ -783,16 +789,6 @@ public class HoodieWriteConfig extends HoodieConfig {
*/
@Deprecated
public static final String DEFAULT_WRITE_CONCURRENCY_MODE = WRITE_CONCURRENCY_MODE.defaultValue();
/**
* @deprecated Use {@link #WRITE_META_KEY_PREFIXES} and its methods instead
*/
@Deprecated
public static final String WRITE_META_KEY_PREFIXES_PROP = WRITE_META_KEY_PREFIXES.key();
/**
* @deprecated Use {@link #WRITE_META_KEY_PREFIXES} and its methods instead
*/
@Deprecated
public static final String DEFAULT_WRITE_META_KEY_PREFIXES = WRITE_META_KEY_PREFIXES.defaultValue();
/**
* @deprecated Use {@link #ALLOW_MULTI_WRITE_ON_SAME_INSTANT_ENABLE} and its methods instead
*/
@@ -1764,12 +1760,12 @@ public class HoodieWriteConfig extends HoodieConfig {
return WriteConcurrencyMode.fromValue(getString(WRITE_CONCURRENCY_MODE));
}
public Boolean inlineTableServices() {
return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean();
public Boolean mergeDeltastreamerStateFromPreviousCommit() {
return getBoolean(HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE);
}
public String getWriteMetaKeyPrefixes() {
return getString(WRITE_META_KEY_PREFIXES);
public Boolean inlineTableServices() {
return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean();
}
public String getPreCommitValidators() {
@@ -2131,11 +2127,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
public Builder withWriteMetaKeyPrefixes(String writeMetaKeyPrefixes) {
writeConfig.setValue(WRITE_META_KEY_PREFIXES, writeMetaKeyPrefixes);
return this;
}
public Builder withPopulateMetaFields(boolean populateMetaFields) {
writeConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS, Boolean.toString(populateMetaFields));
return this;

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utils;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class TestTransactionUtils extends HoodieCommonTestHarness {
@BeforeEach
public void setUp() throws Exception {
init();
}
public void init() throws Exception {
initPath();
initMetaClient();
metaClient.getFs().mkdirs(new Path(basePath));
}
@Test
public void testCheckpointStateMerge() throws IOException {
HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient);
// Create completed commit with deltastreamer checkpoint state
HoodieInstant commitInstantWithCheckpointState = new HoodieInstant(
true,
HoodieTimeline.COMMIT_ACTION,
HoodieActiveTimeline.createNewInstantTime()
);
timeline.createNewInstant(commitInstantWithCheckpointState);
HoodieCommitMetadata metadataWithCheckpoint = new HoodieCommitMetadata();
String checkpointVal = "00001";
metadataWithCheckpoint.addMetadata(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, checkpointVal);
timeline.saveAsComplete(
commitInstantWithCheckpointState,
Option.of(metadataWithCheckpoint.toJsonString().getBytes(StandardCharsets.UTF_8))
);
// Inflight commit without checkpoint metadata
HoodieInstant commitInstantWithoutCheckpointState = new HoodieInstant(
true,
HoodieTimeline.COMMIT_ACTION,
HoodieActiveTimeline.createNewInstantTime()
);
timeline.createNewInstant(commitInstantWithoutCheckpointState);
HoodieCommitMetadata metadataWithoutCheckpoint = new HoodieCommitMetadata();
// Ensure that checkpoint state is merged in from previous completed commit
MockTransactionUtils.assertCheckpointStateWasMerged(metaClient, metadataWithoutCheckpoint, checkpointVal);
}
private static class MockTransactionUtils extends TransactionUtils {
public static void assertCheckpointStateWasMerged(
HoodieTableMetaClient metaClient,
HoodieCommitMetadata currentMetadata,
String expectedCheckpointState) {
TransactionUtils.mergeCheckpointStateFromPreviousCommit(metaClient, Option.of(currentMetadata));
assertEquals(
expectedCheckpointState,
currentMetadata.getExtraMetadata().get(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY)
);
}
}
}

View File

@@ -106,6 +106,7 @@ import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -715,6 +716,11 @@ public class DeltaSync implements Serializable {
String.format("%s should be set to %s", COMBINE_BEFORE_INSERT.key(), cfg.filterDupes));
ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT.key(), combineBeforeUpsert));
ValidationUtils.checkArgument(!config.mergeDeltastreamerStateFromPreviousCommit(),
String.format(
"Deltastreamer processes should not merge state from previous deltastreamer commits. Please unset '%s'",
WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key())
);
return config;
}

View File

@@ -44,6 +44,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncTool;
@@ -90,7 +91,7 @@ public class HoodieDeltaStreamer implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(HoodieDeltaStreamer.class);
public static final String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
public static final String CHECKPOINT_KEY = HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY;
public static final String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
protected final transient Config cfg;

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.functional;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -27,6 +28,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
@@ -35,6 +37,7 @@ import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.sql.SaveMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
@@ -71,6 +74,8 @@ import static org.apache.hudi.utilities.testutils.sources.AbstractBaseTestSource
@Tag("functional")
public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctionalTestHarness {
private static final String COW_TEST_TABLE_NAME = "testtable_COPY_ON_WRITE";
String basePath;
String propsFilePath;
String tableBasePath;
@@ -154,7 +159,6 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
setUpTestTable(tableType);
prepareInitialConfigs(fs(), basePath, "foo");
// enable carrying forward latest checkpoint
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
@@ -171,36 +175,61 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TripsWithDistanceTransformer.class.getName()));
cfgBackfillJob.continuousMode = false;
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
HoodieCommitMetadata commitMetadataForFirstInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
// get current checkpoint after preparing base dataset with some commits
HoodieCommitMetadata commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointBeforeParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
// run the backfill job, enable overriding checkpoint from the latest commit
// run the backfill job
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
props.setProperty("hoodie.write.meta.key.prefixes", CHECKPOINT_KEY);
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
// reset checkpoint to first instant to simulate a random checkpoint for backfill job
// checkpoint will move from 00000 to 00001 for this backfill job
cfgBackfillJob.checkpoint = commitMetadataForFirstInstant.getMetadata(CHECKPOINT_KEY);
// get current checkpoint after preparing base dataset with some commits
HoodieCommitMetadata commitMetadataForLastInstant = getLatestMetadata(meta);
// Set checkpoint to the last successful position
cfgBackfillJob.checkpoint = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
cfgBackfillJob.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
cfgBackfillJob.configs.add(String.format("%s=false", HoodieCompactionConfig.AUTO_CLEAN.key()));
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
backfillJob.sync();
// check if the checkpoint is carried over
timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
commitMetadataForLastInstant = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
String lastCheckpointAfterParallelBackfill = commitMetadataForLastInstant.getMetadata(CHECKPOINT_KEY);
Assertions.assertEquals(lastCheckpointBeforeParallelBackfill, lastCheckpointAfterParallelBackfill);
// Save the checkpoint information from the deltastreamer run and perform next write
String checkpointAfterDeltaSync = getLatestMetadata(meta).getMetadata(CHECKPOINT_KEY);
// this writer will enable HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key() so that deltastreamer checkpoint will be carried over.
performWriteWithDeltastreamerStateMerge();
// Verify that the checkpoint is carried over
HoodieCommitMetadata commitMetaAfterDatasourceWrite = getLatestMetadata(meta);
Assertions.assertEquals(checkpointAfterDeltaSync, commitMetaAfterDatasourceWrite.getMetadata(CHECKPOINT_KEY));
}
/**
* Performs a hudi datasource write with deltastreamer state merge enabled.
*/
private void performWriteWithDeltastreamerStateMerge() {
spark().read()
.format("hudi")
.load(tableBasePath + "/*/*.parquet")
.limit(1)
.write()
.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key(), COW_TEST_TABLE_NAME)
.option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
.option(DataSourceWriteOptions.INSERT_DROP_DUPS().key(), "true")
.option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp")
.option(HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key(), "true")
.mode(SaveMode.Append)
.save(tableBasePath + "/*/*.parquet");
}
private static HoodieCommitMetadata getLatestMetadata(HoodieTableMetaClient meta) throws IOException {
HoodieTimeline timeline = meta.getActiveTimeline().reload().getCommitsTimeline().filterCompletedInstants();
return HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(timeline.lastInstant().get()).get(), HoodieCommitMetadata.class);
}
private static TypedProperties prepareMultiWriterProps(FileSystem fs, String basePath, String propsFilePath) throws IOException {