From a09c2319119d3e1a4cd9200f95e71663da4d7458 Mon Sep 17 00:00:00 2001 From: Yann Byron Date: Tue, 18 Jan 2022 23:50:30 +0800 Subject: [PATCH] [HUDI-2903] get table schema from the last commit with data written (#4180) --- .../common/table/TableSchemaResolver.java | 102 +++----- .../table/timeline/HoodieActiveTimeline.java | 25 ++ .../table/timeline/HoodieDefaultTimeline.java | 23 -- .../common/table/timeline/HoodieTimeline.java | 13 - .../functional/TestHoodieActiveTimeline.scala | 233 ++++++++++++++++++ 5 files changed, 288 insertions(+), 108 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index a953ac3ab..a70774896 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -21,8 +21,10 @@ package org.apache.hudi.common.table; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.SchemaCompatibility; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; + import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -40,8 +42,10 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; + import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; @@ -73,72 +77,41 @@ public class TableSchemaResolver { * commit. We will assume that the schema has not changed within a single atomic write. * * @return Parquet schema for this table - * @throws Exception */ - private MessageType getTableParquetSchemaFromDataFile() throws Exception { + private MessageType getTableParquetSchemaFromDataFile() { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); - + Option> instantAndCommitMetadata = + activeTimeline.getLastCommitMetadataWithValidData(); try { switch (metaClient.getTableType()) { case COPY_ON_WRITE: - // If this is COW, get the last commit and read the schema from a file written in the - // last commit - HoodieInstant lastCommit = - activeTimeline.getCommitsTimeline().filterCompletedInstantsWithCommitMetadata() - .lastInstant().orElseThrow(() -> new InvalidTableException(metaClient.getBasePath())); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(activeTimeline.getInstantDetails(lastCommit).get(), HoodieCommitMetadata.class); - String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny() - .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for commit " - + lastCommit + ", could not get schema for table " + metaClient.getBasePath() + ", Metadata :" - + commitMetadata)); - return readSchemaFromBaseFile(new Path(filePath)); - case MERGE_ON_READ: - // If this is MOR, depending on whether the latest commit is a delta commit or - // compaction commit - // Get a datafile written and get the schema from that file - Option lastCompactionCommit = metaClient.getActiveTimeline().getCommitTimeline() - .filterCompletedInstantsWithCommitMetadata().lastInstant(); - LOG.info("Found the last compaction commit as " + lastCompactionCommit); - - Option lastDeltaCommit; - if (lastCompactionCommit.isPresent()) { - lastDeltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() - .findInstantsAfter(lastCompactionCommit.get().getTimestamp(), Integer.MAX_VALUE).lastInstant(); + // For COW table, the file has data written must be in parquet format currently. + if (instantAndCommitMetadata.isPresent()) { + HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); + String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get(); + return readSchemaFromBaseFile(new Path(filePath)); } else { - lastDeltaCommit = - metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant(); + throw new IllegalArgumentException("Could not find any data file written for commit, " + + "so could not get schema for table " + metaClient.getBasePath()); } - LOG.info("Found the last delta commit " + lastDeltaCommit); - - if (lastDeltaCommit.isPresent()) { - HoodieInstant lastDeltaInstant = lastDeltaCommit.get(); - // read from the log file wrote - commitMetadata = HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(lastDeltaInstant).get(), - HoodieCommitMetadata.class); - Pair filePathWithFormat = - commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream() - .filter(s -> s.contains(HoodieLogFile.DELTA_EXTENSION)).findAny() - .map(f -> Pair.of(f, HoodieFileFormat.HOODIE_LOG)).orElseGet(() -> { - // No Log files in Delta-Commit. Check if there are any parquet files - return commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream() - .filter(s -> s.contains((metaClient.getTableConfig().getBaseFileFormat().getFileExtension()))) - .findAny().map(f -> Pair.of(f, HoodieFileFormat.PARQUET)).orElseThrow(() -> - new IllegalArgumentException("Could not find any data file written for commit " - + lastDeltaInstant + ", could not get schema for table " + metaClient.getBasePath() - + ", CommitMetadata :" + commitMetadata)); - }); - switch (filePathWithFormat.getRight()) { - case HOODIE_LOG: - return readSchemaFromLogFile(lastCompactionCommit, new Path(filePathWithFormat.getLeft())); - case PARQUET: - return readSchemaFromBaseFile(new Path(filePathWithFormat.getLeft())); - default: - throw new IllegalArgumentException("Unknown file format :" + filePathWithFormat.getRight() - + " for file " + filePathWithFormat.getLeft()); + case MERGE_ON_READ: + // For MOR table, the file has data written may be a parquet file or .log file. + // Determine the file format based on the file name, and then extract schema from it. + if (instantAndCommitMetadata.isPresent()) { + HoodieCommitMetadata commitMetadata = instantAndCommitMetadata.get().getRight(); + String filePath = commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny().get(); + if (filePath.contains(HoodieLogFile.DELTA_EXTENSION)) { + // this is a log file + return readSchemaFromLogFile(new Path(filePath)); + } else if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) { + // this is a parquet file + return readSchemaFromBaseFile(new Path(filePath)); + } else { + throw new IllegalArgumentException("Unknown file format :" + filePath); } } else { - return readSchemaFromLastCompaction(lastCompactionCommit); + throw new IllegalArgumentException("Could not find any data file written for commit, " + + "so could not get schema for table " + metaClient.getBasePath()); } default: LOG.error("Unknown table type " + metaClient.getTableType()); @@ -484,21 +457,6 @@ public class TableSchemaResolver { return readSchemaFromLogFile(metaClient.getRawFs(), path); } - /** - * Read the schema from the log file on path. - * @throws Exception - */ - public MessageType readSchemaFromLogFile(Option lastCompactionCommitOpt, Path path) - throws Exception { - MessageType messageType = readSchemaFromLogFile(path); - // Fall back to read the schema from last compaction - if (messageType == null) { - LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt); - return readSchemaFromLastCompaction(lastCompactionCommitOpt); - } - return messageType; - } - /** * Read the schema from the log file on path. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 613cdb5a6..2fa0b95fd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -19,11 +19,13 @@ package org.apache.hudi.common.table.timeline; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; +import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hadoop.fs.FSDataInputStream; @@ -39,11 +41,14 @@ import java.io.Serializable; import java.text.ParseException; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashSet; +import java.util.List; import java.util.Objects; import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; /** * Represents the Active Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the @@ -254,6 +259,26 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { return readDataFromPath(detailPath); } + /** + * Get the last instant with valid data, and convert this to HoodieCommitMetadata + */ + public Option> getLastCommitMetadataWithValidData() { + List completed = getCommitsTimeline().filterCompletedInstants().getInstants() + .sorted(Comparator.comparing(HoodieInstant::getTimestamp).reversed()).collect(Collectors.toList()); + for (HoodieInstant instant : completed) { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + getInstantDetails(instant).get(), HoodieCommitMetadata.class); + if (!commitMetadata.getFileIdAndRelativePaths().isEmpty()) { + return Option.of(Pair.of(instant, commitMetadata)); + } + } catch (IOException e) { + LOG.warn("Failed to convert instant to HoodieCommitMetadata: " + instant.toString()); + } + } + return Option.empty(); + } + public Option readCleanerInfoAsBytes(HoodieInstant instant) { // Cleaner metadata are always stored only in timeline .hoodie return readDataFromPath(new Path(metaClient.getMetaPath(), instant.getFileName())); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java index 15691f14f..2cf111e91 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java @@ -18,8 +18,6 @@ package org.apache.hudi.common.table.timeline; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; @@ -102,12 +100,6 @@ public class HoodieDefaultTimeline implements HoodieTimeline { return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted), details); } - @Override - public HoodieTimeline filterCompletedInstantsWithCommitMetadata() { - return new HoodieDefaultTimeline(instants.stream().filter(HoodieInstant::isCompleted) - .filter(i -> !isDeletePartitionType(i)), details); - } - @Override public HoodieTimeline filterCompletedAndCompactionInstants() { return new HoodieDefaultTimeline(instants.stream().filter(s -> s.isCompleted() @@ -359,21 +351,6 @@ public class HoodieDefaultTimeline implements HoodieTimeline { return details.apply(instant); } - @Override - public boolean isDeletePartitionType(HoodieInstant instant) { - Option operationType; - - try { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(getInstantDetails(instant).get(), HoodieCommitMetadata.class); - operationType = Option.of(commitMetadata.getOperationType()); - } catch (Exception e) { - operationType = Option.empty(); - } - - return operationType.isPresent() && WriteOperationType.DELETE_PARTITION.equals(operationType.get()); - } - @Override public boolean isEmpty(HoodieInstant instant) { return getInstantDetails(instant).get().length == 0; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java index 3b2779ca3..6ea44a830 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java @@ -131,14 +131,6 @@ public interface HoodieTimeline extends Serializable { */ HoodieTimeline filterCompletedAndCompactionInstants(); - /** - * Filter this timeline to include the completed and exclude operation type is delete partition instants. - * - * @return New instance of HoodieTimeline with include the completed and - * exclude operation type is delete partition instants - */ - HoodieTimeline filterCompletedInstantsWithCommitMetadata(); - /** * Timeline to just include commits (commit/deltacommit), compaction and replace actions. * @@ -291,11 +283,6 @@ public interface HoodieTimeline extends Serializable { boolean isEmpty(HoodieInstant instant); - /** - * Check WriteOperationType is DeletePartition. - */ - boolean isDeletePartitionType(HoodieInstant instant); - /** * Helper methods to compare instants. **/ diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala new file mode 100644 index 000000000..03bd2fe07 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieActiveTimeline.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.hudi.{DataSourceWriteOptions, HoodieDataSourceHelpers} + +import org.apache.log4j.LogManager + +import org.apache.spark.sql._ + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + +import scala.collection.JavaConversions._ + +/** + * Tests on HoodieActionTimeLine using the real hudi table. + */ +class TestHoodieActiveTimeline extends HoodieClientTestBase { + + var spark: SparkSession = null + + private val log = LogManager.getLogger(classOf[TestHoodieActiveTimeline]) + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + + @BeforeEach + override def setUp() { + setTableName("hoodie_test") + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + } + + @AfterEach + override def tearDown() = { + cleanupSparkContexts() + cleanupTestDataGenerator() + cleanupFileSystem() + } + + @Test + def testGetLastCommitMetadataWithValidDataForCOW(): Unit = { + // First Operation: + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + val commit1Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) + val partitionsForCommit1 = spark.read.format("org.apache.hudi").load(basePath) + .select("_hoodie_partition_path") + .distinct().collect() + .map(_.get(0).toString).sorted + assert(Array("2015/03/16", "2015/03/17", "2016/03/15").sameElements(partitionsForCommit1)) + + val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build() + var activeTimeline = metaClient.getActiveTimeline + + // check that get the latest parquet file + val ret1 = activeTimeline.getLastCommitMetadataWithValidData() + assert(ret1.isPresent) + val (instant1, commitMetadata1) = (ret1.get().getLeft, ret1.get().getRight) + assertEquals(instant1.getTimestamp, commit1Time) + val relativePath1 = commitMetadata1.getFileIdAndRelativePaths.values().stream().findAny().get() + assert(relativePath1.contains(commit1Time)) + assert(relativePath1.contains(HoodieFileFormat.PARQUET.getFileExtension)) + + // Second Operation: + // Drop Partition on 2015/03/16 + spark.emptyDataFrame.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key, "2015/03/16") + .mode(SaveMode.Append) + .save(basePath) + val commit2Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) + val countPartitionDropped = spark.read.format("org.apache.hudi").load(basePath) + .where("_hoodie_partition_path = '2015/03/16'").count() + assertEquals(countPartitionDropped, 0) + + // DropPartition will not generate a file with valid data. Get the prev instant and metadata. + activeTimeline = activeTimeline.reload() + val ret2 = activeTimeline.getLastCommitMetadataWithValidData() + assert(ret2.isPresent) + val (instant2, commitMetadata2) = (ret2.get().getLeft, ret2.get().getRight) + assertEquals(instant2.getTimestamp, commit1Time) + val relativePath2 = commitMetadata2.getFileIdAndRelativePaths.values().stream().findAny().get() + assert(relativePath2.contains(commit1Time)) + assert(relativePath2.contains(HoodieFileFormat.PARQUET.getFileExtension)) + + // Third Operation: + // Upsert with 50 duplicate records. Produced the second log file for each parquet. + val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).toList + val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val commit3Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + // check that get the latest parquet file generated by compaction + activeTimeline = activeTimeline.reload() + val ret3 = activeTimeline.getLastCommitMetadataWithValidData() + assert(ret3.isPresent) + val (instant3, commitMetadata3) = (ret3.get().getLeft, ret3.get().getRight) + assertEquals(instant3.getTimestamp, commit3Time) + val relativePath3 = commitMetadata3.getFileIdAndRelativePaths.values().stream().findAny().get() + assert(relativePath3.contains(commit3Time)) + assert(relativePath3.contains(HoodieFileFormat.PARQUET.getFileExtension)) + } + + @Test + def testGetLastCommitMetadataWithValidDataForMOR(): Unit = { + // First Operation: + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(basePath) + val commit1Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + val metaClient: HoodieTableMetaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(hadoopConf).build() + var activeTimeline = metaClient.getActiveTimeline + + // check that get the latest parquet file + val ret1 = activeTimeline.getLastCommitMetadataWithValidData() + assert(ret1.isPresent) + val (instant1, commitMetadata1) = (ret1.get().getLeft, ret1.get().getRight) + assertEquals(instant1.getTimestamp, commit1Time) + val relativePath1 = commitMetadata1.getFileIdAndRelativePaths.values().stream().findAny().get() + assert(relativePath1.contains(commit1Time)) + assert(relativePath1.contains(HoodieFileFormat.PARQUET.getFileExtension)) + + // Second Operation: + // Upsert with duplicate records. Produced a log file for each parquet. + val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val commit2Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + // check that get the latest .log file + activeTimeline = activeTimeline.reload() + val ret2 = activeTimeline.getLastCommitMetadataWithValidData() + assert(ret2.isPresent) + val (instant2, commitMetadata2) = (ret2.get().getLeft, ret2.get().getRight) + assertEquals(instant2.getTimestamp, commit2Time) + val relativePath2 = commitMetadata2.getFileIdAndRelativePaths.values().stream().findAny().get() + // deltacommit: .log file should contain the timestamp from base parquet file. + assert(relativePath2.contains(commit1Time)) + assert(relativePath2.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension)) + + // Third Operation: + // Upsert with 50 duplicate records. Produced the second log file for each parquet. + // And trigger compaction. + val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).toList + val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2)) + inputDF3.write.format("org.apache.hudi") + .options(commonOpts).option("hoodie.compact.inline", "true") + .option("hoodie.compact.inline.max.delta.commits", "1") + .mode(SaveMode.Append).save(basePath) + val commit3Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + // check that get the latest parquet file generated by compaction + activeTimeline = activeTimeline.reload() + val ret3 = activeTimeline.getLastCommitMetadataWithValidData() + assert(ret3.isPresent) + val (instant3, commitMetadata3) = (ret3.get().getLeft, ret3.get().getRight) + assertEquals(instant3.getTimestamp, commit3Time) + val relativePath3 = commitMetadata3.getFileIdAndRelativePaths.values().stream().findAny().get() + assert(relativePath3.contains(commit3Time)) + assert(relativePath3.contains(HoodieFileFormat.PARQUET.getFileExtension)) + + // Fourth Operation: + // Upsert with 50 duplicate records. + val records4 = recordsToStrings(dataGen.generateUniqueUpdates("004", 50)).toList + val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records4, 2)) + inputDF4.write.format("org.apache.hudi") + .options(commonOpts) + .mode(SaveMode.Append) + .save(basePath) + val commit4Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + activeTimeline = activeTimeline.reload() + val ret4 = activeTimeline.getLastCommitMetadataWithValidData() + assert(ret4.isPresent) + val (instant4, commitMetadata4) = (ret4.get().getLeft, ret4.get().getRight) + assertEquals(instant4.getTimestamp, commit4Time) + val relativePath4 = commitMetadata4.getFileIdAndRelativePaths.values().stream().findAny().get() + // deltacommit: .log file should contain the timestamp from base parquet file. + assert(relativePath4.contains(commit3Time)) + assert(relativePath4.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension)) + } +}