1
0

[HUDI-2970] Adding tests for archival of replace commit actions (#4268)

This commit is contained in:
Sivabalan Narayanan
2021-12-18 23:59:39 -08:00
committed by GitHub
parent 478f9f3695
commit 03f71ef1a2

View File

@@ -21,6 +21,8 @@ package org.apache.hudi.functional
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
@@ -28,11 +30,14 @@ import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{Arguments, CsvSource, ValueSource}
import java.util
import java.util.Arrays
import java.util.stream.Stream
import scala.collection.JavaConversions._
@@ -170,4 +175,79 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
.load(basePath)
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
}
@ParameterizedTest
@ValueSource(strings = Array("insert_overwrite", "delete_partition"))
def testArchivalWithReplaceCommitActions(writeOperation: String): Unit = {
val dataGen = new HoodieTestDataGenerator()
// use this to generate records only for certain partitions.
val dataGenPartition1 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH))
val dataGenPartition2 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))
// do one bulk insert to all partitions
val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
val partition1RecordCount = inputDF.filter(row => row.getAs("partition_path")
.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).count()
inputDF.write.format("hudi")
.options(commonOpts)
.option("hoodie.keep.min.commits", "2")
.option("hoodie.keep.max.commits", "3")
.option("hoodie.cleaner.commits.retained", "1")
.option("hoodie.metadata.enable","false")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)
assertRecordCount(basePath, 100)
// issue delete partition to partition1
writeRecords(2, dataGenPartition1, writeOperation, basePath)
val expectedRecCount = if (writeOperation.equals(DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL))
{
200 - partition1RecordCount
} else {
100 - partition1RecordCount
}
assertRecordCount(basePath, expectedRecCount)
// add more data to partition2.
for (i <- 3 to 7) {
writeRecords(i, dataGenPartition2, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, basePath)
}
assertRecordCount(basePath, expectedRecCount + 500)
val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
.setLoadActiveTimelineOnLoad(true).build()
val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
// assert replace commit is archived and not part of active timeline.
assertFalse(commits.contains(HoodieTimeline.REPLACE_COMMIT_ACTION))
// assert that archival timeline has replace commit actions.
val archivedTimeline = metaClient.getArchivedTimeline();
assertTrue(archivedTimeline.getInstants.toArray.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
.filter(action => action.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).size > 0)
}
def writeRecords(commitTime: Int, dataGen: HoodieTestDataGenerator, writeOperation: String, basePath: String): Unit = {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(commitTime), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("hudi")
.options(commonOpts)
.option("hoodie.keep.min.commits", "2")
.option("hoodie.keep.max.commits", "3")
.option("hoodie.cleaner.commits.retained", "1")
.option("hoodie.metadata.enable","false")
.option(DataSourceWriteOptions.OPERATION.key, writeOperation)
.mode(SaveMode.Append)
.save(basePath)
}
def assertRecordCount(basePath: String, expectedRecordCount: Long) : Unit = {
val snapshotDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(expectedRecordCount, snapshotDF.count())
}
}