diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java index 0dafba4e5..b98417ef2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java @@ -296,6 +296,9 @@ public class DataSourceUtils { SlashEncodedDayPartitionValueExtractor.class.getName()); hiveSyncConfig.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC().key(), DataSourceWriteOptions.HIVE_USE_JDBC().defaultValue())); + if (props.containsKey(DataSourceWriteOptions.HIVE_SYNC_MODE().key())) { + hiveSyncConfig.syncMode = props.getString(DataSourceWriteOptions.HIVE_SYNC_MODE().key()); + } hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().key(), DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().defaultValue())); hiveSyncConfig.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(), diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java index 081a8e4e6..6353aa216 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDataSourceUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; @@ -27,6 +28,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner; +import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Conversions; @@ -41,6 +43,8 @@ import org.apache.spark.sql.Row; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; @@ -49,13 +53,18 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.math.BigDecimal; import java.time.LocalDate; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; +import static org.apache.hudi.hive.ddl.HiveSyncMode.HMS; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.times; @@ -65,6 +74,9 @@ import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) public class TestDataSourceUtils { + private static final String HIVE_DATABASE = "testdb1"; + private static final String HIVE_TABLE = "hive_trips"; + @Mock private SparkRDDWriteClient hoodieWriteClient; @@ -199,6 +211,29 @@ public class TestDataSourceUtils { assertThat(partitioner.isPresent(), is(true)); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testBuildHiveSyncConfig(boolean useSyncMode) { + TypedProperties props = new TypedProperties(); + if (useSyncMode) { + props.setProperty(DataSourceWriteOptions.HIVE_SYNC_MODE().key(), HMS.name()); + props.setProperty(DataSourceWriteOptions.HIVE_USE_JDBC().key(), String.valueOf(false)); + } + props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), HIVE_DATABASE); + props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), HIVE_TABLE); + HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, config.getBasePath(), PARQUET.name()); + + if (useSyncMode) { + assertFalse(hiveSyncConfig.useJdbc); + assertEquals(HMS.name(), hiveSyncConfig.syncMode); + } else { + assertTrue(hiveSyncConfig.useJdbc); + assertNull(hiveSyncConfig.syncMode); + } + assertEquals(HIVE_DATABASE, hiveSyncConfig.databaseName); + assertEquals(HIVE_TABLE, hiveSyncConfig.tableName); + } + private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) { config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath()) .withUserDefinedBulkInsertPartitionerClass(partitionerClassName)