1
0

[HUDI-3361] Fixing missing begin checkpoint in HoodieIncremental pull (#4755)

This commit is contained in:
Sivabalan Narayanan
2022-02-08 12:03:07 -05:00
committed by GitHub
parent 6a32cfe020
commit 60831d6906
4 changed files with 75 additions and 45 deletions

View File

@@ -31,7 +31,6 @@ import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -123,24 +122,34 @@ public class HoodieIncrSource extends RowSource {
Option<String> beginInstant =
lastCkptStr.isPresent() ? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr : Option.empty();
Pair<String, String> instantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(sparkContext, srcPath,
Pair<String, Pair<String, String>> queryTypeAndInstantEndpts = IncrSourceHelper.calculateBeginAndEndInstants(sparkContext, srcPath,
numInstantsPerFetch, beginInstant, missingCheckpointStrategy);
if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
return Pair.of(Option.empty(), instantEndpts.getKey());
if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getKey());
return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getKey());
}
Dataset<Row> source = null;
// Do Incr pull. Set end instant if available
DataFrameReader reader = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()));
Dataset<Row> source = reader.load(srcPath);
if (queryTypeAndInstantEndpts.getKey().equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())) {
source = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryTypeAndInstantEndpts.getValue().getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryTypeAndInstantEndpts.getValue().getRight())
.option(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
props.getString(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().key(),
DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN_FOR_NON_EXISTING_FILES().defaultValue()))
.load(srcPath);
} else {
// if checkpoint is missing from source table, and if strategy is set to READ_UPTO_LATEST_COMMIT, we have to issue snapshot query
source = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
.load(srcPath)
// add filtering so that only interested records are returned.
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryTypeAndInstantEndpts.getRight().getLeft()));
}
/*
* log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
@@ -168,6 +177,6 @@ public class HoodieIncrSource extends RowSource {
final Dataset<Row> src = source.drop(HoodieRecord.HOODIE_META_COLUMNS.stream()
.filter(x -> !x.equals(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toArray(String[]::new));
// log.info("Final Schema from Source is :" + src.schema());
return Pair.of(Option.of(src), instantEndpts.getRight());
return Pair.of(Option.of(src), queryTypeAndInstantEndpts.getRight().getRight());
}
}

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -33,7 +34,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -101,24 +101,33 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr
: Option.empty();
Pair<String, String> instantEndpts =
Pair<String, Pair<String, String>> queryTypeAndInstantEndpts =
IncrSourceHelper.calculateBeginAndEndInstants(
sparkContext, srcPath, numInstantsPerFetch, beginInstant, missingCheckpointStrategy);
if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
return Pair.of(Option.empty(), instantEndpts.getKey());
if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getKey());
return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getKey());
}
Dataset<Row> source = null;
// Do incremental pull. Set end instant if available.
DataFrameReader metaReader = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
Dataset<Row> source = metaReader.load(srcPath);
if (queryTypeAndInstantEndpts.getKey().equals(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())) {
source = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), queryTypeAndInstantEndpts.getRight().getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), queryTypeAndInstantEndpts.getRight().getRight()).load(srcPath);
} else {
// if checkpoint is missing from source table, and if strategy is set to READ_UPTO_LATEST_COMMIT, we have to issue snapshot query
source = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL()).load(srcPath)
// add filtering so that only interested records are returned.
.filter(String.format("%s > '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD,
queryTypeAndInstantEndpts.getRight().getLeft()));
}
if (source.isEmpty()) {
return Pair.of(Option.empty(), instantEndpts.getRight());
return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getRight().getRight());
}
String filter = "s3.object.size > 0";
@@ -167,6 +176,6 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
if (!cloudFiles.isEmpty()) {
dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0])));
}
return Pair.of(dataset, instantEndpts.getRight());
return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight());
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -63,10 +64,10 @@ public class IncrSourceHelper {
* @param numInstantsPerFetch Max Instants per fetch
* @param beginInstant Last Checkpoint String
* @param missingCheckpointStrategy when begin instant is missing, allow reading based on missing checkpoint strategy
* @return begin and end instants
* @return begin and end instants along with query type.
*/
public static Pair<String, String> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath,
int numInstantsPerFetch, Option<String> beginInstant, MissingCheckpointStrategy missingCheckpointStrategy) {
public static Pair<String, Pair<String, String>> calculateBeginAndEndInstants(JavaSparkContext jssc, String srcBasePath,
int numInstantsPerFetch, Option<String> beginInstant, MissingCheckpointStrategy missingCheckpointStrategy) {
ValidationUtils.checkArgument(numInstantsPerFetch > 0,
"Make sure the config hoodie.deltastreamer.source.hoodieincr.num_instants is set to a positive value");
HoodieTableMetaClient srcMetaClient = HoodieTableMetaClient.builder().setConf(jssc.hadoopConfiguration()).setBasePath(srcBasePath).setLoadActiveTimelineOnLoad(true).build();
@@ -88,15 +89,14 @@ public class IncrSourceHelper {
}
});
if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST || !activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
Option<HoodieInstant> nthInstant = Option.fromJavaOptional(activeCommitTimeline
.findInstantsAfter(beginInstantTime, numInstantsPerFetch).getInstants().reduce((x, y) -> y));
return Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime));
return Pair.of(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(), Pair.of(beginInstantTime, nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)));
} else {
// if beginInstant is DEFAULT_BEGIN_TIMESTAMP, MissingCheckpointStrategy should be set.
// when MissingCheckpointStrategy is set to read everything until latest.
// when MissingCheckpointStrategy is set to read everything until latest, trigger snapshot query.
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
return Pair.of(beginInstantTime, lastInstant.get().getTimestamp());
return Pair.of(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(), Pair.of(beginInstantTime, lastInstant.get().getTimestamp()));
}
}

View File

@@ -20,12 +20,14 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -61,21 +63,31 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness {
@Test
public void testHoodieIncrSource() throws IOException {
HoodieWriteConfig writeConfig = getConfigBuilder(basePath).build();
HoodieWriteConfig writeConfig = getConfigBuilder(basePath)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2,3).retainCommits(1).build()).withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context, writeConfig);
Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, true, null);
Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, true, null);
Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, true, null);
Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, true, null, "100");
Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, true, null, "200");
Pair<String, List<HoodieRecord>> inserts3 = writeRecords(writeClient, true, null, "300");
Pair<String, List<HoodieRecord>> inserts4 = writeRecords(writeClient, true, null, "400");
Pair<String, List<HoodieRecord>> inserts5 = writeRecords(writeClient, true, null, "500");
// read everything upto latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, 300, inserts3.getKey());
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey());
// even if the begin timestamp is archived (100), full table scan should kick in, but should filter for records having commit time > 100
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("100"), 400, inserts5.getKey());
// even if the read upto latest is set, if begin timestamp is in active timeline, only incremental should kick in.
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.of("400"), 100, inserts5.getKey());
// read just the latest
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, 100, inserts3.getKey());
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.empty(), 100, inserts5.getKey());
}
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, int expectedCount, String expectedCheckpoint) {
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount, String expectedCheckpoint) {
Properties properties = new Properties();
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath);
@@ -84,14 +96,14 @@ public class TestHoodieIncrSource extends HoodieClientTestHarness {
HoodieIncrSource incrSource = new HoodieIncrSource(typedProperties, jsc, sparkSession, new TestSchemaProvider(HoodieTestDataGenerator.AVRO_SCHEMA));
// read everything until latest
Pair<Option<Dataset<Row>>, String> batchCheckPoint = incrSource.fetchNextBatch(Option.empty(), 500);
Pair<Option<Dataset<Row>>, String> batchCheckPoint = incrSource.fetchNextBatch(checkpointToPull, 500);
Assertions.assertNotNull(batchCheckPoint.getValue());
assertEquals(batchCheckPoint.getKey().get().count(), expectedCount);
Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint);
}
public Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List<HoodieRecord> insertRecords) throws IOException {
String commit = writeClient.startCommit();
public Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient, boolean insert, List<HoodieRecord> insertRecords, String commit) throws IOException {
writeClient.startCommitWithTime(commit);
List<HoodieRecord> records = insert ? dataGen.generateInserts(commit, 100) : dataGen.generateUpdates(commit, insertRecords);
JavaRDD<WriteStatus> result = writeClient.upsert(jsc.parallelize(records, 1), commit);
List<WriteStatus> statuses = result.collect();