1
0

[HUDI-2793] Fixing deltastreamer checkpoint fetch/copy over (#4034)

- Removed the copy over logic in transaction utils. Deltastreamer will go back to previous commits and get the checkpoint value.
This commit is contained in:
Sivabalan Narayanan
2021-11-24 18:26:40 -05:00
committed by GitHub
parent ff94d92980
commit 435ea1543c
7 changed files with 134 additions and 188 deletions

View File

@@ -50,6 +50,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.keygen.KeyGenerator;
@@ -106,7 +107,6 @@ import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT;
import static org.apache.hudi.config.HoodieWriteConfig.AUTO_COMMIT_ENABLE;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_INSERT;
import static org.apache.hudi.config.HoodieWriteConfig.COMBINE_BEFORE_UPSERT;
import static org.apache.hudi.config.HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_RESET_KEY;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -339,11 +339,17 @@ public class DeltaSync implements Serializable {
resumeCheckpointStr = Option.empty();
} else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
HoodieTimeline.LESSER_THAN, lastCommit.get().getTimestamp())) {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
+ commitMetadata.toJsonString());
// if previous commit metadata did not have the checkpoint key, try traversing previous commits until we find one.
Option<String> prevCheckpoint = getPreviousCheckpoint(commitTimelineOpt.get());
if (prevCheckpoint.isPresent()) {
resumeCheckpointStr = prevCheckpoint;
} else {
throw new HoodieDeltaStreamerException(
"Unable to find previous checkpoint. Please double check if this table "
+ "was indeed built via delta streamer. Last Commit :" + lastCommit + ", Instants :"
+ commitTimelineOpt.get().getInstants().collect(Collectors.toList()) + ", CommitMetadata="
+ commitMetadata.toJsonString());
}
}
// KAFKA_CHECKPOINT_TYPE will be honored only for first batch.
if (!StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))) {
@@ -451,6 +457,18 @@ public class DeltaSync implements Serializable {
return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
}
protected Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws IOException {
return timeline.getReverseOrderedInstants().map(instant -> {
try {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
return Option.ofNullable(commitMetadata.getMetadata(CHECKPOINT_KEY));
} catch (IOException e) {
throw new HoodieIOException("Failed to parse HoodieCommitMetadata for " + instant.toString(), e);
}
}).filter(Option::isPresent).findFirst().orElse(Option.empty());
}
/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed.
*
@@ -716,12 +734,6 @@ public class DeltaSync implements Serializable {
String.format("%s should be set to %s", COMBINE_BEFORE_INSERT.key(), cfg.filterDupes));
ValidationUtils.checkArgument(config.shouldCombineBeforeUpsert(),
String.format("%s should be set to %s", COMBINE_BEFORE_UPSERT.key(), combineBeforeUpsert));
ValidationUtils.checkArgument(!config.mergeDeltastreamerStateFromPreviousCommit(),
String.format(
"Deltastreamer processes should not merge state from previous deltastreamer commits. Please unset '%s'",
WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key())
);
return config;
}

View File

@@ -20,7 +20,14 @@ package org.apache.hudi.utilities.functional;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
@@ -35,6 +42,9 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
@@ -277,4 +287,20 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {
}
}
static void addCommitToTimeline(HoodieTableMetaClient metaCient) throws IOException {
addCommitToTimeline(metaCient, Collections.emptyMap());
}
static void addCommitToTimeline(HoodieTableMetaClient metaCient, Map<String, String> extraMetadata) throws IOException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
commitMetadata.setOperationType(WriteOperationType.UPSERT);
extraMetadata.forEach((k,v) -> commitMetadata.getExtraMetadata().put(k, v));
String commitTime = HoodieActiveTimeline.createNewInstantTime();
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, commitTime));
metaCient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime));
metaCient.getActiveTimeline().saveAsComplete(
new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, commitTime),
Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
}

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.functional;
import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -37,10 +38,12 @@ import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
@@ -48,8 +51,10 @@ import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.schema.SparkAvroPostProcessor;
import org.apache.hudi.utilities.sources.CsvDFSSource;
import org.apache.hudi.utilities.sources.HoodieIncrSource;
@@ -68,6 +73,7 @@ import org.apache.hudi.utilities.transform.Transformer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
@@ -102,6 +108,7 @@ import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -1691,6 +1698,46 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
TestHelpers.assertCommitMetadata("00001", tableBasePath, dfs, 2);
}
@Test
public void testFetchingCheckpointFromPreviousCommits() throws IOException {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(dfsBasePath + "/testFetchPreviousCheckpoint", WriteOperationType.BULK_INSERT);
TypedProperties properties = new TypedProperties();
properties.setProperty("hoodie.datasource.write.recordkey.field","key");
properties.setProperty("hoodie.datasource.write.partitionpath.field","pp");
TestDeltaSync testDeltaSync = new TestDeltaSync(cfg, sparkSession, null, properties,
jsc, dfs, jsc.hadoopConfiguration(), null);
properties.put(HoodieTableConfig.NAME.key(), "sample_tbl");
HoodieTableMetaClient metaClient = HoodieTestUtils.init(jsc.hadoopConfiguration(), dfsBasePath, HoodieTableType.COPY_ON_WRITE, properties);
Map<String, String> extraMetadata = new HashMap<>();
extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "abc");
addCommitToTimeline(metaClient, extraMetadata);
metaClient.reloadActiveTimeline();
assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "abc");
addCommitToTimeline(metaClient, Collections.emptyMap());
metaClient.reloadActiveTimeline();
extraMetadata.put(HoodieWriteConfig.DELTASTREAMER_CHECKPOINT_KEY, "def");
addCommitToTimeline(metaClient, extraMetadata);
metaClient.reloadActiveTimeline();
assertEquals(testDeltaSync.getPreviousCheckpoint(metaClient.getActiveTimeline().getCommitsTimeline()).get(), "def");
}
class TestDeltaSync extends DeltaSync {
public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,
JavaSparkContext jssc, FileSystem fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, onInitializingHoodieWriteClient);
}
protected Option<String> getPreviousCheckpoint(HoodieTimeline timeline) throws IOException {
return super.getPreviousCheckpoint(timeline);
}
}
/**
* UDF to calculate Haversine distance.
*/

View File

@@ -64,6 +64,7 @@ import static org.apache.hudi.config.HoodieWriteConfig.INSERT_PARALLELISM_VALUE;
import static org.apache.hudi.config.HoodieWriteConfig.UPSERT_PARALLELISM_VALUE;
import static org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.CHECKPOINT_KEY;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.PROPS_FILENAME_TEST_MULTI_WRITER;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.addCommitToTimeline;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.defaultSchemaProviderClassName;
import static org.apache.hudi.utilities.functional.HoodieDeltaStreamerTestBase.prepareInitialConfigs;
import static org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.deltaStreamerTestRunner;
@@ -155,7 +156,11 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
@ParameterizedTest
@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE"})
void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
public void testLatestCheckpointCarryOverWithMultipleWriters(HoodieTableType tableType) throws Exception {
testCheckpointCarryOver(tableType);
}
private void testCheckpointCarryOver(HoodieTableType tableType) throws Exception {
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
setUpTestTable(tableType);
prepareInitialConfigs(fs(), basePath, "foo");
@@ -196,34 +201,28 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
HoodieDeltaStreamer backfillJob = new HoodieDeltaStreamer(cfgBackfillJob, jsc());
backfillJob.sync();
// Save the checkpoint information from the deltastreamer run and perform next write
String checkpointAfterDeltaSync = getLatestMetadata(meta).getMetadata(CHECKPOINT_KEY);
// this writer will enable HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key() so that deltastreamer checkpoint will be carried over.
performWriteWithDeltastreamerStateMerge();
meta.reloadActiveTimeline();
int totalCommits = meta.getCommitsTimeline().filterCompletedInstants().countInstants();
// Verify that the checkpoint is carried over
HoodieCommitMetadata commitMetaAfterDatasourceWrite = getLatestMetadata(meta);
Assertions.assertEquals(checkpointAfterDeltaSync, commitMetaAfterDatasourceWrite.getMetadata(CHECKPOINT_KEY));
// add a new commit to timeline which may not have the checkpoint in extra metadata
addCommitToTimeline(meta);
meta.reloadActiveTimeline();
verifyCommitMetadataCheckpoint(meta, null);
cfgBackfillJob.checkpoint = null;
new HoodieDeltaStreamer(cfgBackfillJob, jsc()).sync(); // if deltastreamer checkpoint fetch does not walk back to older commits, this sync will fail
meta.reloadActiveTimeline();
Assertions.assertEquals(totalCommits + 2, meta.getCommitsTimeline().filterCompletedInstants().countInstants());
verifyCommitMetadataCheckpoint(meta, "00008");
}
/**
* Performs a hudi datasource write with deltastreamer state merge enabled.
*/
private void performWriteWithDeltastreamerStateMerge() {
spark().read()
.format("hudi")
.load(tableBasePath + "/*/*.parquet")
.limit(1)
.write()
.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key(), COW_TEST_TABLE_NAME)
.option(DataSourceWriteOptions.OPERATION().key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())
.option(DataSourceWriteOptions.INSERT_DROP_DUPS().key(), "true")
.option(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp")
.option(HoodieWriteConfig.WRITE_CONCURRENCY_MERGE_DELTASTREAMER_STATE.key(), "true")
.mode(SaveMode.Append)
.save(tableBasePath + "/*/*.parquet");
private void verifyCommitMetadataCheckpoint(HoodieTableMetaClient metaClient, String expectedCheckpoint) throws IOException {
HoodieCommitMetadata commitMeta = getLatestMetadata(metaClient);
if (expectedCheckpoint == null) {
Assertions.assertNull(commitMeta.getMetadata(CHECKPOINT_KEY));
} else {
Assertions.assertEquals(expectedCheckpoint, commitMeta.getMetadata(CHECKPOINT_KEY));
}
}
private static HoodieCommitMetadata getLatestMetadata(HoodieTableMetaClient meta) throws IOException {