From 60831d69060aeb8f112fd7aff9c9323cd213ddb2 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 8 Feb 2022 12:03:07 -0500 Subject: [PATCH] [HUDI-3361] Fixing missing begin checkpoint in HoodieIncremental pull (#4755) --- .../utilities/sources/HoodieIncrSource.java | 39 ++++++++++++------- .../sources/S3EventsHoodieIncrSource.java | 33 ++++++++++------ .../sources/helpers/IncrSourceHelper.java | 16 ++++---- .../sources/TestHoodieIncrSource.java | 32 ++++++++++----- 4 files changed, 75 insertions(+), 45 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 0eed93763..8d310223d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -31,7 +31,6 @@ import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -123,24 +122,34 @@ public class HoodieIncrSource extends RowSource { Option beginInstant = lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr : Option.empty(); - Pair instantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(sparkContext, srcPath, + Pair> queryTypeAndInstantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy); - if (instantEndpts.getKey().equals(instantEndpts.getValue())) { - LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey()); - return Pair.of(Option.empty(), instantEndpts.getKey()); + if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) { + LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getKey()); + return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getKey()); } + Dataset source = null; // Do Incr pull. Set end instant if available - DataFrameReader reader = sparkSession.read().format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft()) - .option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight()) - .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), - props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), - DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue())); - - Dataset source = reader.load(srcPath); + if (queryTypeAndInstantEndpts.getKey().equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())) { + source = sparkSession.read().format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryTypeAndInstantEndpts.getValue().getLeft()) + .option(DataSourceReadOptions.END_INSTANTTIME().key(), queryTypeAndInstantEndpts.getValue().getRight()) + .option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), + props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(), + DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue())) + .load(srcPath); + } else { + // if checkpoint is missing from source table, and if strategy is set to READ_UPTO_LATEST_COMMIT, we have to issue snapshot query + source = sparkSession.read().format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()) + .load(srcPath) + // add filtering so that only interested records are returned. + .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + queryTypeAndInstantEndpts.getRight().getLeft())); + } /* * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema()); @@ -168,6 +177,6 @@ public class HoodieIncrSource extends RowSource { final Dataset src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream() .filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new)); // log.info("Final Schema from Source is :" + src.schema()); - return Pair.of(Option.of(src), instantEndpts.getRight()); + return Pair.of(Option.of(src), queryTypeAndInstantEndpts.getRight().getRight()); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 286feb374..d272025dd 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -33,7 +34,6 @@ import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -101,24 +101,33 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr : Option.empty(); - Pair instantEndpts = + Pair> queryTypeAndInstantEndpts = IncrSourceHelper.calculateBeginAndEndInstants( sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy); - if (instantEndpts.getKey().equals(instantEndpts.getValue())) { - LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey()); - return Pair.of(Option.empty(), instantEndpts.getKey()); + if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) { + LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getKey()); + return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getKey()); } + Dataset source = null; // Do incremental pull. Set end instant if available. - DataFrameReader metaReader = sparkSession.read().format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft()) - .option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight()); - Dataset source = metaReader.load(srcPath); + if (queryTypeAndInstantEndpts.getKey().equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())) { + source = sparkSession.read().format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryTypeAndInstantEndpts.getRight().getLeft()) + .option(DataSourceReadOptions.END_INSTANTTIME().key(), queryTypeAndInstantEndpts.getRight().getRight()).load(srcPath); + } else { + // if checkpoint is missing from source table, and if strategy is set to READ_UPTO_LATEST_COMMIT, we have to issue snapshot query + source = sparkSession.read().format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath) + // add filtering so that only interested records are returned. + .filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, + queryTypeAndInstantEndpts.getRight().getLeft())); + } if (source.isEmpty()) { - return Pair.of(Option.empty(), instantEndpts.getRight()); + return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight()); } String filter = "s3.object.size > 0"; @@ -167,6 +176,6 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { if (!cloudFiles.isEmpty()) { dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0]))); } - return Pair.of(dataset, instantEndpts.getRight()); + return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight()); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java index a370c314a..cbfb153ee 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities.sources.helpers; +import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -63,10 +64,10 @@ public class IncrSourceHelper { * @param numInstantsPerFetch Max Instants per fetch * @param beginInstant Last Checkpoint String * @param missingCheckpointStrategy when begin instant is missing, allow reading based on missing checkpoint strategy - * @return begin and end instants + * @return begin and end instants along with query type. */ - public static Pair calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath, - int numInstantsPerFetch, Option beginInstant, MissingCheckpointStrategy missingCheckpointStrategy) { + public static Pair> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath, + int numInstantsPerFetch, Option beginInstant, MissingCheckpointStrategy missingCheckpointStrategy) { ValidationUtils.checkArgument(numInstantsPerFetch > 0, "Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value"); HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build(); @@ -88,15 +89,14 @@ public class IncrSourceHelper { } }); - if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) { + if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST || !activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) { Option nthInstant = Option.fromJavaOptional(activeCommitTimeline .findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y)); - return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)); + return Pair.of(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime))); } else { - // if beginInstant is DEFAULT_BEGIN_TIMESTAMP, MissingCheckpointStrategy should be set. - // when MissingCheckpointStrategy is set to read everything until latest. + // when MissingCheckpointStrategy is set to read everything until latest, trigger snapshot query. Option lastInstant = activeCommitTimeline.lastInstant(); - return Pair.of(beginInstantTime, lastInstant.get().getTimestamp()); + return Pair.of(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), Pair.of(beginInstantTime, lastInstant.get().getTimestamp())); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index 250e28829..87034bd8f 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -20,12 +20,14 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.utilities.schema.SchemaProvider; @@ -61,21 +63,31 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness { @Test public void testHoodieIncrSource() throws IOException { - HoodieWriteConfig writeConfig = getConfigBuilder(basePath).build(); + HoodieWriteConfig writeConfig = getConfigBuilder(basePath) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2,3).retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build(); SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig); - Pair> inserts = writeRecords(writeClient, true, null); - Pair> inserts2 = writeRecords(writeClient, true, null); - Pair> inserts3 = writeRecords(writeClient, true, null); + Pair> inserts = writeRecords(writeClient, true, null, "100"); + Pair> inserts2 = writeRecords(writeClient, true, null, "200"); + Pair> inserts3 = writeRecords(writeClient, true, null, "300"); + Pair> inserts4 = writeRecords(writeClient, true, null, "400"); + Pair> inserts5 = writeRecords(writeClient, true, null, "500"); + // read everything upto latest - readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, 300, inserts3.getKey()); + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey()); + + // even if the begin timestamp is archived (100), full table scan should kick in, but should filter for records having commit time > 100 + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("100"), 400, inserts5.getKey()); + + // even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in. + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("400"), 100, inserts5.getKey()); // read just the latest - readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 100, inserts3.getKey()); + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.empty(), 100, inserts5.getKey()); } - private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, int expectedCount, String expectedCheckpoint) { + private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option checkpointToPull, int expectedCount, String expectedCheckpoint) { Properties properties = new Properties(); properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath); @@ -84,14 +96,14 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness { HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA)); // read everything until latest - Pair>, String> batchCheckPoint = incrSource.fetchNextBatch(Option.empty(), 500); + Pair>, String> batchCheckPoint = incrSource.fetchNextBatch(checkpointToPull, 500); Assertions.assertNotNull(batchCheckPoint.getValue()); assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint); } - public Pair> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List insertRecords) throws IOException { - String commit = writeClient.startCommit(); + public Pair> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List insertRecords, String commit) throws IOException { + writeClient.startCommitWithTime(commit); List records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords); JavaRDD result = writeClient.upsert(jsc.parallelize(records, 1), commit); List statuses = result.collect();