diff --git a/README.md b/README.md index 4cb363ab9..764ba0d48 100644 --- a/README.md +++ b/README.md @@ -21,8 +21,8 @@ Hudi manages the storage of large analytical datasets on DFS (Cloud stores, HDFS -[![Build Status](https://travis-ci.com/apache/hudi.svg?branch=master)](https://travis-ci.com/apache/hudi) -[![Build Status](https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_apis/build/status/apachehudi-ci.hudi-mirror?branchName=master)](https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/latest?definitionId=3&branchName=master) +[![Build](https://github.com/apache/hudi/actions/workflows/bot.yml/badge.svg)](https://github.com/apache/hudi/actions/workflows/bot.yml) +[![Test](https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_apis/build/status/apachehudi-ci.hudi-mirror?branchName=master)](https://dev.azure.com/apache-hudi-ci-org/apache-hudi-ci/_build/latest?definitionId=3&branchName=master) [![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.hudi/hudi/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.hudi%22) [![Join on Slack](https://img.shields.io/badge/slack-%23hudi-72eff8?logo=slack&color=48c628&label=Join%20on%20Slack)](https://join.slack.com/t/apache-hudi/shared_invite/enQtODYyNDAxNzc5MTg2LTE5OTBlYmVhYjM0N2ZhOTJjOWM4YzBmMWU2MjZjMGE4NDc5ZDFiOGQ2N2VkYTVkNzU3ZDQ4OTI1NmFmYWQ0NzE) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index a0bf6a75a..ef68cd52e 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -19,7 +19,7 @@ trigger: - '*' # must quote since "*" is a YAML reserved character; we want a string pool: - vmImage: 'ubuntu-16.04' + vmImage: 'ubuntu-18.04' variables: MAVEN_CACHE_FOLDER: $(Pipeline.Workspace)/.m2/repository diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java index 616dc3173..0ab6f32dc 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java @@ -18,14 +18,16 @@ package org.apache.hudi.callback.http; +import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient; + import org.apache.http.StatusLine; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; @@ -39,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -63,6 +66,11 @@ public class TestCallbackHttpClient { @Mock StatusLine statusLine; + @AfterEach + void resetMocks() { + reset(appender, httpClient, httpResponse, statusLine); + } + private void mockResponse(int statusCode) { when(statusLine.getStatusCode()).thenReturn(statusCode); when(httpResponse.getStatusLine()).thenReturn(statusLine); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java index 5767d189d..642c46baf 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogHttpClient.java @@ -27,6 +27,7 @@ import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -46,6 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -67,6 +69,11 @@ public class TestDatadogHttpClient { @Mock StatusLine statusLine; + @AfterEach + void resetMocks() { + reset(appender, httpClient, httpResponse, statusLine); + } + private void mockResponse(int statusCode) { when(statusLine.getStatusCode()).thenReturn(statusCode); when(httpResponse.getStatusLine()).thenReturn(statusLine); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java index 1654e1648..607cce9ee 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/datadog/TestDatadogReporter.java @@ -28,6 +28,7 @@ import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; @@ -41,6 +42,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; @ExtendWith(MockitoExtension.class) @@ -58,6 +60,11 @@ public class TestDatadogReporter { @Mock DatadogHttpClient client; + @AfterEach + void resetMocks() { + reset(appender, registry, client); + } + @Test public void stopShouldCloseEnclosedClient() throws IOException { new DatadogReporter(registry, client, "foo", Option.empty(), Option.empty(), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index efc143000..bfd4423db 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -156,123 +156,6 @@ class TestCOWDataSource extends HoodieClientTestBase { assertEquals(snapshotDF1.count() - inputDF2.count(), snapshotDF2.count()) } - - @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testCopyOnWriteStorage(isMetadataEnabled: Boolean) { - // Insert Operation - val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList - val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - inputDF1.write.format("org.apache.hudi") - .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .mode(SaveMode.Overwrite) - .save(basePath) - - assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) - val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) - - // Snapshot query - val snapshotDF1 = spark.read.format("org.apache.hudi") - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .load(basePath + "/*/*/*") - assertEquals(100, snapshotDF1.count()) - - // Upsert based on the written table with Hudi metadata columns - val verificationRowKey = snapshotDF1.limit(1).select("_row_key").first.getString(0) - val updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) - - updateDf.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .mode(SaveMode.Append) - .save(basePath) - val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath) - - val snapshotDF2 = spark.read.format("hudi") - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .load(basePath + "/*/*/*") - assertEquals(100, snapshotDF2.count()) - assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) - - // Upsert Operation without Hudi metadata columns - val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList - val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2)) - val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() - - inputDF2.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .mode(SaveMode.Append) - .save(basePath) - - val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath) - assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size()) - - // Snapshot Query - val snapshotDF3 = spark.read.format("org.apache.hudi") - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .load(basePath + "/*/*/*") - assertEquals(100, snapshotDF3.count()) // still 100, since we only updated - - // Read Incremental Query - // we have 2 commits, try pulling the first commit (which is not the latest) - val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0) - val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000") - .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit) - .load(basePath) - assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled - var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect() - assertEquals(1, countsPerCommit.length) - assertEquals(firstCommit, countsPerCommit(0).get(0)) - - // Test incremental query has no instant in range - val emptyIncDF = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000") - .option(DataSourceReadOptions.END_INSTANTTIME.key, "001") - .load(basePath) - assertEquals(0, emptyIncDF.count()) - - // Upsert an empty dataFrame - val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList - val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) - emptyDF.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .mode(SaveMode.Append) - .save(basePath) - - // pull the latest commit - val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2) - .load(basePath) - - assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled - countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect() - assertEquals(1, countsPerCommit.length) - assertEquals(commitInstantTime3, countsPerCommit(0).get(0)) - - // pull the latest commit within certain partitions - val hoodieIncViewDF3 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2) - .option(DataSourceReadOptions.INCR_PATH_GLOB.key, "/2016/*/*/*") - .load(basePath) - assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count()) - - val timeTravelDF = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) - .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000") - .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit) - .load(basePath) - assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled - } - @Test def testOverWriteModeUseReplaceAction(): Unit = { val records1 = recordsToStrings(dataGen.generateInserts("001", 5)).toList val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala new file mode 100644 index 000000000..9524adbd5 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.{col, lit} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Tag +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import scala.collection.JavaConversions._ + + +@Tag("functional") +class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testCopyOnWriteStorage(isMetadataEnabled: Boolean): Unit = { + val dataGen = new HoodieTestDataGenerator() + val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) + // Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + // Snapshot query + val snapshotDF1 = spark.read.format("org.apache.hudi") + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .load(basePath) + assertEquals(100, snapshotDF1.count()) + + // Upsert based on the written table with Hudi metadata columns + val verificationRowKey = snapshotDF1.limit(1).select("_row_key").first.getString(0) + val updateDf = snapshotDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) + + updateDf.write.format("org.apache.hudi") + .options(commonOpts) + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .mode(SaveMode.Append) + .save(basePath) + val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + val snapshotDF2 = spark.read.format("hudi") + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .load(basePath) + assertEquals(100, snapshotDF2.count()) + assertEquals(updatedVerificationVal, snapshotDF2.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) + + // Upsert Operation without Hudi metadata columns + val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2 , 2)) + val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count() + + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .mode(SaveMode.Append) + .save(basePath) + + val commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, basePath) + assertEquals(3, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size()) + + // Snapshot Query + val snapshotDF3 = spark.read.format("org.apache.hudi") + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .load(basePath) + assertEquals(100, snapshotDF3.count()) // still 100, since we only updated + + // Read Incremental Query + // we have 2 commits, try pulling the first commit (which is not the latest) + val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0) + val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000") + .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit) + .load(basePath) + assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled + var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(firstCommit, countsPerCommit(0).get(0)) + + // Test incremental query has no instant in range + val emptyIncDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000") + .option(DataSourceReadOptions.END_INSTANTTIME.key, "001") + .load(basePath) + assertEquals(0, emptyIncDF.count()) + + // Upsert an empty dataFrame + val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList + val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1)) + emptyDF.write.format("org.apache.hudi") + .options(commonOpts) + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .mode(SaveMode.Append) + .save(basePath) + + // pull the latest commit + val hoodieIncViewDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2) + .load(basePath) + + assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled + countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect() + assertEquals(1, countsPerCommit.length) + assertEquals(commitInstantTime3, countsPerCommit(0).get(0)) + + // pull the latest commit within certain partitions + val hoodieIncViewDF3 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2) + .option(DataSourceReadOptions.INCR_PATH_GLOB.key, "/2016/*/*/*") + .load(basePath) + assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count()) + + val timeTravelDF = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000") + .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit) + .load(basePath) + assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index f9409e07e..ee914aec5 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -20,7 +20,6 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.HoodieMetadataConfig -import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieTableType} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.testutils.HoodieTestDataGenerator @@ -37,7 +36,7 @@ import org.apache.spark.sql.types.BooleanType import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.{CsvSource, ValueSource} +import org.junit.jupiter.params.provider.CsvSource import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -75,72 +74,6 @@ class TestMORDataSource extends HoodieClientTestBase { cleanupFileSystem() } - @ParameterizedTest - @ValueSource(booleans = Array(true, false)) - def testMergeOnReadStorage(isMetadataEnabled: Boolean) { - - val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) - // Bulk Insert Operation - val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList - val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) - inputDF1.write.format("org.apache.hudi") - .options(commonOpts) - .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same - .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .mode(SaveMode.Overwrite) - .save(basePath) - - assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) - - // Read RO View - val hudiRODF1 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .load(basePath + "/*/*/*") - - assertEquals(100, hudiRODF1.count()) // still 100, since we only updated - val insertCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath) - val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList - assertEquals(List(insertCommitTime), insertCommitTimes) - - // Upsert operation without Hudi metadata columns - val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList - val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) - inputDF2.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .mode(SaveMode.Append) - .save(basePath) - - // Read Snapshot query - val updateCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath) - val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") - .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .load(basePath + "/*/*/*") - - val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList - assertEquals(List(updateCommitTime), updateCommitTimes) - - // Upsert based on the written table with Hudi metadata columns - val verificationRowKey = hudiSnapshotDF2.limit(1).select("_row_key").first.getString(0) - val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) - - inputDF3.write.format("org.apache.hudi") - .options(commonOpts) - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .mode(SaveMode.Append) - .save(basePath) - - val hudiSnapshotDF3 = spark.read.format("hudi") - .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) - .load(basePath + "/*/*/*") - assertEquals(100, hudiSnapshotDF3.count()) - assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) - } - @Test def testCount() { // First Operation: // Producing parquet files to three default partitions. diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala new file mode 100644 index 000000000..315a14c9d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional + +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions.{col, lit} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Tag +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import scala.collection.JavaConversions._ + + +@Tag("functional") +class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { + + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) + + val verificationCol: String = "driver" + val updatedVerificationVal: String = "driver_update" + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testMergeOnReadStorage(isMetadataEnabled: Boolean) { + val dataGen = new HoodieTestDataGenerator() + val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration) + // Bulk Insert Operation + val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList + val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2)) + inputDF1.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + + // Read RO View + val hudiRODF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .load(basePath) + + assertEquals(100, hudiRODF1.count()) // still 100, since we only updated + val insertCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath) + val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList + assertEquals(List(insertCommitTime), insertCommitTimes) + + // Upsert operation without Hudi metadata columns + val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList + val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2)) + inputDF2.write.format("org.apache.hudi") + .options(commonOpts) + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .mode(SaveMode.Append) + .save(basePath) + + // Read Snapshot query + val updateCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath) + val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .load(basePath) + + val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList + assertEquals(List(updateCommitTime), updateCommitTimes) + + // Upsert based on the written table with Hudi metadata columns + val verificationRowKey = hudiSnapshotDF2.limit(1).select("_row_key").first.getString(0) + val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) + + inputDF3.write.format("org.apache.hudi") + .options(commonOpts) + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .mode(SaveMode.Append) + .save(basePath) + + val hudiSnapshotDF3 = spark.read.format("hudi") + .option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled) + .load(basePath) + assertEquals(100, hudiSnapshotDF3.count()) + assertEquals(updatedVerificationVal, hudiSnapshotDF3.filter(col("_row_key") === verificationRowKey).select(verificationCol).first.getString(0)) + } +}