[HUDI-2484] Fix hive sync mode setting in Deltastreamer (#3712)
This commit is contained in:
@@ -296,6 +296,9 @@ public class DataSourceUtils {
|
||||
SlashEncodedDayPartitionValueExtractor.class.getName());
|
||||
hiveSyncConfig.useJdbc = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_USE_JDBC().key(),
|
||||
DataSourceWriteOptions.HIVE_USE_JDBC().defaultValue()));
|
||||
if (props.containsKey(DataSourceWriteOptions.HIVE_SYNC_MODE().key())) {
|
||||
hiveSyncConfig.syncMode = props.getString(DataSourceWriteOptions.HIVE_SYNC_MODE().key());
|
||||
}
|
||||
hiveSyncConfig.autoCreateDatabase = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().key(),
|
||||
DataSourceWriteOptions.HIVE_AUTO_CREATE_DATABASE().defaultValue()));
|
||||
hiveSyncConfig.ignoreExceptions = Boolean.valueOf(props.getString(DataSourceWriteOptions.HIVE_IGNORE_EXCEPTIONS().key(),
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi;
|
||||
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
@@ -27,6 +28,7 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
|
||||
import org.apache.hudi.hive.HiveSyncConfig;
|
||||
import org.apache.hudi.table.BulkInsertPartitioner;
|
||||
|
||||
import org.apache.avro.Conversions;
|
||||
@@ -41,6 +43,8 @@ import org.apache.spark.sql.Row;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.ValueSource;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.Mock;
|
||||
@@ -49,13 +53,18 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import java.math.BigDecimal;
|
||||
import java.time.LocalDate;
|
||||
|
||||
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
|
||||
import static org.apache.hudi.hive.ddl.HiveSyncMode.HMS;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.times;
|
||||
@@ -65,6 +74,9 @@ import static org.mockito.Mockito.when;
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
public class TestDataSourceUtils {
|
||||
|
||||
private static final String HIVE_DATABASE = "testdb1";
|
||||
private static final String HIVE_TABLE = "hive_trips";
|
||||
|
||||
@Mock
|
||||
private SparkRDDWriteClient hoodieWriteClient;
|
||||
|
||||
@@ -199,6 +211,29 @@ public class TestDataSourceUtils {
|
||||
assertThat(partitioner.isPresent(), is(true));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ValueSource(booleans = {true, false})
|
||||
public void testBuildHiveSyncConfig(boolean useSyncMode) {
|
||||
TypedProperties props = new TypedProperties();
|
||||
if (useSyncMode) {
|
||||
props.setProperty(DataSourceWriteOptions.HIVE_SYNC_MODE().key(), HMS.name());
|
||||
props.setProperty(DataSourceWriteOptions.HIVE_USE_JDBC().key(), String.valueOf(false));
|
||||
}
|
||||
props.setProperty(DataSourceWriteOptions.HIVE_DATABASE().key(), HIVE_DATABASE);
|
||||
props.setProperty(DataSourceWriteOptions.HIVE_TABLE().key(), HIVE_TABLE);
|
||||
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, config.getBasePath(), PARQUET.name());
|
||||
|
||||
if (useSyncMode) {
|
||||
assertFalse(hiveSyncConfig.useJdbc);
|
||||
assertEquals(HMS.name(), hiveSyncConfig.syncMode);
|
||||
} else {
|
||||
assertTrue(hiveSyncConfig.useJdbc);
|
||||
assertNull(hiveSyncConfig.syncMode);
|
||||
}
|
||||
assertEquals(HIVE_DATABASE, hiveSyncConfig.databaseName);
|
||||
assertEquals(HIVE_TABLE, hiveSyncConfig.tableName);
|
||||
}
|
||||
|
||||
private void setAndVerifyHoodieWriteClientWith(final String partitionerClassName) {
|
||||
config = HoodieWriteConfig.newBuilder().withPath(config.getBasePath())
|
||||
.withUserDefinedBulkInsertPartitionerClass(partitionerClassName)
|
||||
|
||||
Reference in New Issue
Block a user