[HUDI-2911] Removing default value for PARTITIONPATH_FIELD_NAME resulting in incorrect KeyGenerator configuration (#4195)
This commit is contained in:
@@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hudi.exception;
|
||||||
|
|
||||||
|
import javax.annotation.Nonnull;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class collecting common utilities helping in handling {@link Exception}s
|
||||||
|
*/
|
||||||
|
public final class ExceptionUtil {
|
||||||
|
private ExceptionUtil() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches inner-most cause of the provided {@link Throwable}
|
||||||
|
*/
|
||||||
|
@Nonnull
|
||||||
|
public static Throwable getRootCause(@Nonnull Throwable t) {
|
||||||
|
Throwable cause = t;
|
||||||
|
while (cause.getCause() != null) {
|
||||||
|
cause = cause.getCause();
|
||||||
|
}
|
||||||
|
|
||||||
|
return cause;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -52,7 +52,7 @@ public class KeyGeneratorOptions extends HoodieConfig {
|
|||||||
|
|
||||||
public static final ConfigProperty<String> PARTITIONPATH_FIELD_NAME = ConfigProperty
|
public static final ConfigProperty<String> PARTITIONPATH_FIELD_NAME = ConfigProperty
|
||||||
.key("hoodie.datasource.write.partitionpath.field")
|
.key("hoodie.datasource.write.partitionpath.field")
|
||||||
.defaultValue("partitionpath")
|
.noDefaultValue()
|
||||||
.withDocumentation("Partition path field. Value to be used at the partitionPath component of HoodieKey. "
|
.withDocumentation("Partition path field. Value to be used at the partitionPath component of HoodieKey. "
|
||||||
+ "Actual value ontained by invoking .toString()");
|
+ "Actual value ontained by invoking .toString()");
|
||||||
|
|
||||||
|
|||||||
@@ -676,7 +676,7 @@ object DataSourceWriteOptions {
|
|||||||
val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()
|
val PARTITIONPATH_FIELD_OPT_KEY = KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()
|
||||||
/** @deprecated Use {@link PARTITIONPATH_FIELD} and its methods instead */
|
/** @deprecated Use {@link PARTITIONPATH_FIELD} and its methods instead */
|
||||||
@Deprecated
|
@Deprecated
|
||||||
val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = PARTITIONPATH_FIELD.defaultValue()
|
val DEFAULT_PARTITIONPATH_FIELD_OPT_VAL = null
|
||||||
|
|
||||||
/** @deprecated Use {@link TABLE_NAME} and its methods instead */
|
/** @deprecated Use {@link TABLE_NAME} and its methods instead */
|
||||||
@Deprecated
|
@Deprecated
|
||||||
|
|||||||
@@ -56,7 +56,6 @@ object HoodieWriterUtils {
|
|||||||
hoodieConfig.setDefaultValue(PRECOMBINE_FIELD)
|
hoodieConfig.setDefaultValue(PRECOMBINE_FIELD)
|
||||||
hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME)
|
hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME)
|
||||||
hoodieConfig.setDefaultValue(RECORDKEY_FIELD)
|
hoodieConfig.setDefaultValue(RECORDKEY_FIELD)
|
||||||
hoodieConfig.setDefaultValue(PARTITIONPATH_FIELD)
|
|
||||||
hoodieConfig.setDefaultValue(KEYGENERATOR_CLASS_NAME)
|
hoodieConfig.setDefaultValue(KEYGENERATOR_CLASS_NAME)
|
||||||
hoodieConfig.setDefaultValue(ENABLE)
|
hoodieConfig.setDefaultValue(ENABLE)
|
||||||
hoodieConfig.setDefaultValue(COMMIT_METADATA_KEYPREFIX)
|
hoodieConfig.setDefaultValue(COMMIT_METADATA_KEYPREFIX)
|
||||||
|
|||||||
@@ -25,8 +25,9 @@ import org.apache.hudi.common.config.HoodieConfig
|
|||||||
import org.apache.hudi.common.model._
|
import org.apache.hudi.common.model._
|
||||||
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
|
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
||||||
|
import org.apache.hudi.common.util.PartitionPathEncodeUtils
|
||||||
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
|
import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
|
||||||
import org.apache.hudi.exception.HoodieException
|
import org.apache.hudi.exception.{ExceptionUtil, HoodieException}
|
||||||
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
|
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
|
||||||
import org.apache.hudi.functional.TestBootstrap
|
import org.apache.hudi.functional.TestBootstrap
|
||||||
import org.apache.hudi.hive.HiveSyncConfig
|
import org.apache.hudi.hive.HiveSyncConfig
|
||||||
@@ -40,13 +41,16 @@ import org.apache.spark.sql.hudi.command.SqlKeyGenerator
|
|||||||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||||
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
|
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
|
||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
|
||||||
|
import org.junit.jupiter.api.function.Executable
|
||||||
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
|
import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
|
||||||
import org.mockito.ArgumentMatchers.any
|
import org.mockito.ArgumentMatchers.any
|
||||||
import org.mockito.Mockito.{spy, times, verify}
|
import org.mockito.Mockito.{spy, times, verify}
|
||||||
|
import org.scalatest.Assertions.assertThrows
|
||||||
import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, intercept}
|
import org.scalatest.Matchers.{assertResult, be, convertToAnyShouldWrapper, intercept}
|
||||||
|
|
||||||
|
import java.io.IOException
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.{Collections, Date, UUID}
|
import java.util.{Collections, Date, UUID}
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
@@ -346,7 +350,6 @@ class TestHoodieSparkSqlWriter {
|
|||||||
@Test
|
@Test
|
||||||
def testInsertDatasetWithoutPrecombineField(): Unit = {
|
def testInsertDatasetWithoutPrecombineField(): Unit = {
|
||||||
|
|
||||||
//create a new table
|
|
||||||
val fooTableModifier = commonTableModifier.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
val fooTableModifier = commonTableModifier.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||||
.updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "false")
|
.updated(DataSourceWriteOptions.INSERT_DROP_DUPS.key, "false")
|
||||||
|
|
||||||
@@ -375,6 +378,28 @@ class TestHoodieSparkSqlWriter {
|
|||||||
assert(df.except(trimmedDf).count() == 0)
|
assert(df.except(trimmedDf).count() == 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test case for insert dataset without partitioning field
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
def testInsertDatasetWithoutPartitionField(): Unit = {
|
||||||
|
val tableOpts =
|
||||||
|
commonTableModifier
|
||||||
|
.updated(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||||
|
|
||||||
|
// generate the inserts
|
||||||
|
val schema = DataSourceTestUtils.getStructTypeExampleSchema
|
||||||
|
val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
|
||||||
|
val records = DataSourceTestUtils.generateRandomRows(1)
|
||||||
|
val recordsSeq = convertRowListToSeq(records)
|
||||||
|
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
|
||||||
|
|
||||||
|
// try write to Hudi
|
||||||
|
assertThrows[IOException] {
|
||||||
|
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts - DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test case for bulk insert dataset with datasource impl multiple rounds.
|
* Test case for bulk insert dataset with datasource impl multiple rounds.
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user