1
0

[HUDI-1848] Adding support for HMS for running DDL queries in hive-sy… (#2879)

* [HUDI-1848] Adding support for HMS for running DDL queries in hive-sync-tool

* [HUDI-1848] Fixing test cases

* [HUDI-1848] CR changes

* [HUDI-1848] Fix checkstyle violations

* [HUDI-1848] Fixed a bug when metastore api fails for complex schemas with multiple levels.

* [HUDI-1848] Adding the complex schema and resolving merge conflicts

* [HUDI-1848] Adding some more javadocs

* [HUDI-1848] Added javadocs for DDLExecutor impls

* [HUDI-1848] Fixed style issue
This commit is contained in:
jsbali
2021-07-23 21:33:15 +05:30
committed by GitHub
parent 71e14cf866
commit 66207ed91a
15 changed files with 3327 additions and 503 deletions

View File

@@ -26,15 +26,22 @@ import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.ConfigUtils;
import org.apache.hudi.hive.util.HiveSchemaUtil;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.joda.time.DateTime;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -42,36 +49,65 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import static org.apache.hudi.hive.testutils.HiveTestUtil.ddlExecutor;
import static org.apache.hudi.hive.testutils.HiveTestUtil.fileSystem;
import static org.apache.hudi.hive.testutils.HiveTestUtil.hiveSyncConfig;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHiveSyncTool {
private static Stream<Boolean> useJdbc() {
return Stream.of(false, true);
private static final List<Object> SYNC_MODES = Arrays.asList(
"hms",
"hiveql",
"jdbc");
private static Iterable<Object> syncMode() {
return SYNC_MODES;
}
private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}});
private static Iterable<Object[]> syncModeAndSchemaFromCommitMetadata() {
List<Object[]> opts = new ArrayList<>();
for (Object mode : SYNC_MODES) {
opts.add(new Object[] {true, mode});
opts.add(new Object[] {false, mode});
}
return opts;
}
private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadataAndManagedTable() {
return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}});
@AfterAll
public static void cleanUpClass() {
HiveTestUtil.shutdown();
}
private static Iterable<Object[]> syncModeAndSchemaFromCommitMetadataAndManagedTable() {
List<Object[]> opts = new ArrayList<>();
for (Object mode : SYNC_MODES) {
opts.add(new Object[] {true, true, mode});
opts.add(new Object[] {false, false, mode});
}
return opts;
}
// (useJdbc, useSchemaFromCommitMetadata, syncAsDataSource)
private static Iterable<Object[]> syncDataSourceTableParams() {
return Arrays.asList(new Object[][] {{true, true, true}, {true, false, false}, {false, true, true}, {false, false, false}});
List<Object[]> opts = new ArrayList<>();
for (Object mode : SYNC_MODES) {
opts.add(new Object[] {true, true, mode});
opts.add(new Object[] {false, false, mode});
}
return opts;
}
@BeforeEach
@@ -84,24 +120,130 @@ public class TestHiveSyncTool {
HiveTestUtil.clear();
}
@AfterAll
public static void cleanUpClass() {
HiveTestUtil.shutdown();
/**
* Testing converting array types to Hive field declaration strings.
* <p>
* Refer to the Parquet-113 spec: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
*/
@Test
public void testSchemaConvertArray() throws IOException {
// Testing the 3-level annotation structure
MessageType schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.optional(PrimitiveType.PrimitiveTypeName.INT32).named("element").named("list").named("int_list")
.named("ArrayOfInts");
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`int_list` ARRAY< int>", schemaString);
// A array of arrays
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().requiredGroup()
.as(OriginalType.LIST).repeatedGroup().required(PrimitiveType.PrimitiveTypeName.INT32).named("element")
.named("list").named("element").named("list").named("int_list_list").named("ArrayOfArrayOfInts");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`int_list_list` ARRAY< ARRAY< int>>", schemaString);
// A list of integers
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeated(PrimitiveType.PrimitiveTypeName.INT32)
.named("element").named("int_list").named("ArrayOfInts");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`int_list` ARRAY< int>", schemaString);
// A list of structs with two fields
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").required(PrimitiveType.PrimitiveTypeName.INT32)
.named("num").named("element").named("tuple_list").named("ArrayOfTuples");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`tuple_list` ARRAY< STRUCT< `str` : binary, `num` : int>>", schemaString);
// A list of structs with a single field
// For this case, since the inner group name is "array", we treat the
// element type as a one-element struct.
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("array").named("one_tuple_list")
.named("ArrayOfOneTuples");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
// A list of structs with a single field
// For this case, since the inner group name ends with "_tuple", we also treat the
// element type as a one-element struct.
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list_tuple")
.named("one_tuple_list").named("ArrayOfOneTuples2");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< STRUCT< `str` : binary>>", schemaString);
// A list of structs with a single field
// Unlike the above two cases, for this the element type is the type of the
// only field in the struct.
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup()
.required(PrimitiveType.PrimitiveTypeName.BINARY).named("str").named("one_tuple_list").named("one_tuple_list")
.named("ArrayOfOneTuples3");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`one_tuple_list` ARRAY< binary>", schemaString);
// A list of maps
schema = Types.buildMessage().optionalGroup().as(OriginalType.LIST).repeatedGroup().as(OriginalType.MAP)
.repeatedGroup().as(OriginalType.MAP_KEY_VALUE).required(PrimitiveType.PrimitiveTypeName.BINARY)
.as(OriginalType.UTF8).named("string_key").required(PrimitiveType.PrimitiveTypeName.INT32).named("int_value")
.named("key_value").named("array").named("map_list").named("ArrayOfMaps");
schemaString = HiveSchemaUtil.generateSchemaString(schema);
assertEquals("`map_list` ARRAY< MAP< string, int>>", schemaString);
}
@Test
public void testSchemaConvertTimestampMicros() throws IOException {
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
String schemaString = HiveSchemaUtil.generateSchemaString(schema);
// verify backward compatibility - int64 converted to bigint type
assertEquals("`my_element` bigint", schemaString);
// verify new functionality - int64 converted to timestamp type when 'supportTimestamp' is enabled
schemaString = HiveSchemaUtil.generateSchemaString(schema, Collections.emptyList(), true);
assertEquals("`my_element` TIMESTAMP", schemaString);
}
@Test
public void testSchemaDiffForTimestampMicros() {
MessageType schema = Types.buildMessage().optional(PrimitiveType.PrimitiveTypeName.INT64)
.as(OriginalType.TIMESTAMP_MICROS).named("my_element").named("my_timestamp");
// verify backward compatibility - int64 converted to bigint type
SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
Collections.emptyMap(), Collections.emptyList(), false);
assertEquals("bigint", schemaDifference.getAddColumnTypes().get("`my_element`"));
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
schemaDifference.getAddColumnTypes(), Collections.emptyList(), false);
assertTrue(schemaDifference.isEmpty());
// verify schema difference is calculated correctly when supportTimestamp is enabled
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
Collections.emptyMap(), Collections.emptyList(), true);
assertEquals("TIMESTAMP", schemaDifference.getAddColumnTypes().get("`my_element`"));
schemaDifference = HiveSchemaUtil.getSchemaDifference(schema,
schemaDifference.getAddColumnTypes(), Collections.emptyList(), true);
assertTrue(schemaDifference.isEmpty());
}
@ParameterizedTest
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource({"syncModeAndSchemaFromCommitMetadata"})
public void testBasicSync(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
// we need renew the hiveclient after tool.syncHoodieTable(), because it will close hive
// session, then lead to connection retry, we can see there is a exception at log.
@@ -112,36 +254,36 @@ public class TestHiveSyncTool {
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
hiveClient.getDataSchema().getColumns().size() + 1,
"Hive Schema should match the table schema + partition field");
assertEquals(5, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
// Adding of new partitions
List<String> newPartition = Arrays.asList("2050/01/01");
hiveClient.addPartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, Arrays.asList());
assertEquals(5, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, Arrays.asList());
assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"No new partition should be added");
hiveClient.addPartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, newPartition);
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
hiveClient.addPartitionsToTable(hiveSyncConfig.tableName, newPartition);
assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"New partition should be added");
// Update partitions
hiveClient.updatePartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, Arrays.asList());
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
hiveClient.updatePartitionsToTable(hiveSyncConfig.tableName, Arrays.asList());
assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Partition count should remain the same");
hiveClient.updatePartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, newPartition);
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
hiveClient.updatePartitionsToTable(hiveSyncConfig.tableName, newPartition);
assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Partition count should remain the same");
// Alter partitions
// Manually change a hive partition location to check if the sync will detect
// it and generate a partition update event for it.
hiveClient.updateHiveSQL("ALTER TABLE `" + HiveTestUtil.hiveSyncConfig.tableName
ddlExecutor.runSQL("ALTER TABLE `" + hiveSyncConfig.tableName
+ "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
List<Partition> hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
//writtenPartitionsSince.add(newPartition.get(0));
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
@@ -149,22 +291,22 @@ public class TestHiveSyncTool {
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
"The one partition event must of type UPDATE");
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
// Sync should update the changed partition to correct path
List<Partition> tablePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
List<Partition> tablePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
assertEquals(6, tablePartitions.size(), "The one partition we wrote should be added to hive");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was synced should be 100");
}
@ParameterizedTest
@MethodSource({"syncDataSourceTableParams"})
public void testSyncCOWTableWithProperties(boolean useJdbc,
boolean useSchemaFromCommitMetadata,
boolean syncAsDataSourceTable) throws Exception {
public void testSyncCOWTableWithProperties(boolean useSchemaFromCommitMetadata,
boolean syncAsDataSourceTable,
String syncMode) throws Exception {
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
Map<String, String> serdeProperties = new HashMap<String, String>() {
@@ -179,14 +321,15 @@ public class TestHiveSyncTool {
put("tp_1", "p1");
}
};
hiveSyncConfig.syncMode = syncMode;
hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable;
hiveSyncConfig.useJdbc = useJdbc;
hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties);
hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties);
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
SessionState.start(HiveTestUtil.getHiveConf());
@@ -254,9 +397,9 @@ public class TestHiveSyncTool {
@ParameterizedTest
@MethodSource({"syncDataSourceTableParams"})
public void testSyncMORTableWithProperties(boolean useJdbc,
boolean useSchemaFromCommitMetadata,
boolean syncAsDataSourceTable) throws Exception {
public void testSyncMORTableWithProperties(boolean useSchemaFromCommitMetadata,
boolean syncAsDataSourceTable,
String syncMode) throws Exception {
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 3;
Map<String, String> serdeProperties = new HashMap<String, String>() {
@@ -272,7 +415,7 @@ public class TestHiveSyncTool {
}
};
hiveSyncConfig.syncAsSparkDataSourceTable = syncAsDataSourceTable;
hiveSyncConfig.useJdbc = useJdbc;
hiveSyncConfig.syncMode = syncMode;
hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties);
hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties);
String instantTime = "100";
@@ -325,13 +468,13 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource({"useJdbcAndSchemaFromCommitMetadataAndManagedTable"})
public void testSyncManagedTable(boolean useJdbc,
boolean useSchemaFromCommitMetadata,
boolean isManagedTable) throws Exception {
@MethodSource({"syncModeAndSchemaFromCommitMetadataAndManagedTable"})
public void testSyncManagedTable(boolean useSchemaFromCommitMetadata,
boolean isManagedTable,
String syncMode) throws Exception {
HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig;
hiveSyncConfig.useJdbc = useJdbc;
hiveSyncConfig.syncMode = syncMode;
hiveSyncConfig.createManagedTable = isManagedTable;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
@@ -356,20 +499,39 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testSyncIncremental(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncMode")
public void testSyncWithSchema(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
String commitTime = "100";
HiveTestUtil.createCOWTableWithSchema(commitTime, "/complex.schema.avsc");
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
HoodieHiveClient hiveClient =
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertEquals(1, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
}
@ParameterizedTest
@MethodSource("syncMode")
public void testSyncIncremental(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String commitTime1 = "100";
HiveTestUtil.createCOWTable(commitTime1, 5, true);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
assertEquals(5, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
assertEquals(5, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was synced should be updated in the TBLPROPERTIES");
// Now lets create more partitions and these are the only ones which needs to be synced
@@ -378,37 +540,37 @@ public class TestHiveSyncTool {
HiveTestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2);
// Lets do the sync
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<Partition> hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
// Sync should add the one partition
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"The one partition we wrote should be added to hive");
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was synced should be 101");
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncMode")
public void testSyncIncrementalWithSchemaEvolution(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String commitTime1 = "100";
HiveTestUtil.createCOWTable(commitTime1, 5, true);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
int fields = hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size();
int fields = hiveClient.getTableSchema(hiveSyncConfig.tableName).size();
// Now lets create more partitions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
@@ -416,51 +578,51 @@ public class TestHiveSyncTool {
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
// Lets do the sync
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
assertEquals(fields + 3, hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
assertEquals(fields + 3, hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
"Hive Schema has evolved and should not be 3 more field");
assertEquals("BIGINT", hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("favorite_number"),
assertEquals("BIGINT", hiveClient.getTableSchema(hiveSyncConfig.tableName).get("favorite_number"),
"Hive Schema has evolved - Field favorite_number has evolved from int to long");
assertTrue(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).containsKey("favorite_movie"),
assertTrue(hiveClient.getTableSchema(hiveSyncConfig.tableName).containsKey("favorite_movie"),
"Hive Schema has evolved - Field favorite_movie was added");
// Sync should add the one partition
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
"The one partition we wrote should be added to hive");
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was synced should be 101");
}
@ParameterizedTest
@MethodSource("useJdbcAndSchemaFromCommitMetadata")
public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncModeAndSchemaFromCommitMetadata")
public void testSyncMergeOnRead(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
String deltaCommitTime = "101";
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
useSchemaFromCommitMetadata);
String roTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
HoodieHiveClient hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(roTableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
String roTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertFalse(hiveClient.doesTableExist(roTableName), "Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes");
if (useSchemaFromCommitMetadata) {
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size(),
"Hive Schema should match the table schema + partition field");
}
@@ -478,19 +640,19 @@ public class TestHiveSyncTool {
HiveTestUtil.addMORPartitions(1, true, false,
useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
if (useSchemaFromCommitMetadata) {
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the evolved table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size(),
"Hive Schema should match the evolved table schema + partition field");
}
// Sync should add the one partition
@@ -501,38 +663,38 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource("useJdbcAndSchemaFromCommitMetadata")
public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncModeAndSchemaFromCommitMetadata")
public void testSyncMergeOnReadRT(boolean useSchemaFromCommitMetadata, String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
String deltaCommitTime = "101";
String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
String snapshotTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata);
HoodieHiveClient hiveClientRT =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertFalse(hiveClientRT.doesTableExist(snapshotTableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
"Table " + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClientRT.doesTableExist(snapshotTableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
"Table " + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should exist after sync completes");
if (useSchemaFromCommitMetadata) {
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size(),
"Hive Schema should match the table schema + partition field");
}
@@ -549,19 +711,19 @@ public class TestHiveSyncTool {
HiveTestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
HiveTestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
hiveClientRT = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
hiveClientRT = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
if (useSchemaFromCommitMetadata) {
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the evolved table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size(),
"Hive Schema should match the evolved table schema + partition field");
}
// Sync should add the one partition
@@ -572,9 +734,10 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncMode")
public void testMultiPartitionKeySync(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, true);
@@ -585,11 +748,11 @@ public class TestHiveSyncTool {
hiveSyncConfig.partitionFields = Arrays.asList("year", "month", "day");
HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
@@ -607,7 +770,7 @@ public class TestHiveSyncTool {
String commitTime2 = "101";
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
@@ -615,7 +778,7 @@ public class TestHiveSyncTool {
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
// Sync should add the one partition
@@ -629,9 +792,9 @@ public class TestHiveSyncTool {
HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3);
HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
@@ -647,9 +810,10 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testNonPartitionedSync(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncMode")
public void testNonPartitionedSync(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, true);
@@ -661,11 +825,11 @@ public class TestHiveSyncTool {
hiveSyncConfig.partitionFields = Arrays.asList("year", "month", "day");
HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
@@ -677,29 +841,30 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testReadSchemaForMOR(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncMode")
public void testReadSchemaForMOR(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String commitTime = "100";
String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
String snapshotTableName = hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
HiveTestUtil.createMORTable(commitTime, "", 5, false, true);
HoodieHiveClient hiveClientRT =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertFalse(hiveClientRT.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
assertFalse(hiveClientRT.doesTableExist(snapshotTableName), "Table " + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClientRT.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
assertTrue(hiveClientRT.doesTableExist(snapshotTableName), "Table " + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should exist after sync completes");
// Schema being read from compacted base files
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
SchemaTestUtil.getSimpleSchema().getFields().size() + hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field");
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote");
@@ -711,13 +876,13 @@ public class TestHiveSyncTool {
HiveTestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
tool.syncHoodieTable();
hiveClientRT = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
hiveClientRT = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
// Schema being read from the log files
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
SchemaTestUtil.getEvolvedSchema().getFields().size() + hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the evolved table schema + partition field");
// Sync should add the one partition
@@ -727,26 +892,27 @@ public class TestHiveSyncTool {
}
@Test
public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException {
public void testConnectExceptionIgnoreConfigSet() throws IOException, URISyntaxException, HiveException, MetaException {
hiveSyncConfig.useJdbc = true;
HiveTestUtil.hiveSyncConfig.useJdbc = true;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, false);
HoodieHiveClient hiveClient =
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncConfig syncToolConfig = HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig);
HiveSyncConfig syncToolConfig = HiveSyncConfig.copy(hiveSyncConfig);
syncToolConfig.ignoreExceptions = true;
syncToolConfig.jdbcUrl = HiveTestUtil.hiveSyncConfig.jdbcUrl
.replace(String.valueOf(HiveTestUtil.hiveTestService.getHiveServerPort()), String.valueOf(NetworkTestUtils.nextFreePort()));
HiveSyncTool tool = new HiveSyncTool(syncToolConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
"Table " + hiveSyncConfig.tableName + " should not exist initially");
}
private void verifyOldParquetFileTest(HoodieHiveClient hiveClient, String emptyCommitTime) throws Exception {
@@ -772,9 +938,9 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testPickingOlderParquetFileIfLatestIsEmptyCommit(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncMode")
public void testPickingOlderParquetFileIfLatestIsEmptyCommit(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, true);
@@ -793,9 +959,9 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncMode")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFails(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, true);
@@ -836,9 +1002,9 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncMode")
public void testNotPickingOlderParquetFileWhenLatestCommitReadFailsForExistingTable(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
final String commitTime = "100";
HiveTestUtil.createCOWTable(commitTime, 1, true);
@@ -883,9 +1049,9 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
@MethodSource("useJdbc")
public void testTypeConverter(boolean useJdbc) throws Exception {
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
@MethodSource("syncMode")
public void testTypeConverter(String syncMode) throws Exception {
hiveSyncConfig.syncMode = syncMode;
HiveTestUtil.hiveSyncConfig.batchSyncNum = 2;
HiveTestUtil.createCOWTable("100", 5, true);
HoodieHiveClient hiveClient =
@@ -895,30 +1061,30 @@ public class TestHiveSyncTool {
String dropTableSql = String.format("DROP TABLE IF EXISTS %s ", tableAbsoluteName);
String createTableSqlPrefix = String.format("CREATE TABLE IF NOT EXISTS %s ", tableAbsoluteName);
String errorMsg = "An error occurred in decimal type converting.";
hiveClient.updateHiveSQL(dropTableSql);
ddlExecutor.runSQL(dropTableSql);
// test one column in DECIMAL
String oneTargetColumnSql = createTableSqlPrefix + "(`decimal_col` DECIMAL(9,8), `bigint_col` BIGINT)";
hiveClient.updateHiveSQL(oneTargetColumnSql);
ddlExecutor.runSQL(oneTargetColumnSql);
System.out.println(hiveClient.getTableSchema(tableName));
assertTrue(hiveClient.getTableSchema(tableName).containsValue("DECIMAL(9,8)"), errorMsg);
hiveClient.updateHiveSQL(dropTableSql);
ddlExecutor.runSQL(dropTableSql);
// test multiple columns in DECIMAL
String multipleTargetColumnSql =
createTableSqlPrefix + "(`decimal_col1` DECIMAL(9,8), `bigint_col` BIGINT, `decimal_col2` DECIMAL(7,4))";
hiveClient.updateHiveSQL(multipleTargetColumnSql);
ddlExecutor.runSQL(multipleTargetColumnSql);
System.out.println(hiveClient.getTableSchema(tableName));
assertTrue(hiveClient.getTableSchema(tableName).containsValue("DECIMAL(9,8)")
&& hiveClient.getTableSchema(tableName).containsValue("DECIMAL(7,4)"), errorMsg);
hiveClient.updateHiveSQL(dropTableSql);
ddlExecutor.runSQL(dropTableSql);
// test no columns in DECIMAL
String noTargetColumnsSql = createTableSqlPrefix + "(`bigint_col` BIGINT)";
hiveClient.updateHiveSQL(noTargetColumnsSql);
ddlExecutor.runSQL(noTargetColumnsSql);
System.out.println(hiveClient.getTableSchema(tableName));
assertTrue(hiveClient.getTableSchema(tableName).size() == 1 && hiveClient.getTableSchema(tableName)
.containsValue("BIGINT"), errorMsg);
hiveClient.updateHiveSQL(dropTableSql);
ddlExecutor.runSQL(dropTableSql);
}
}

View File

@@ -25,10 +25,13 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
@@ -98,20 +101,20 @@ public class HiveSyncFunctionalTestHarness {
return new HoodieHiveClient(hiveSyncConfig, hiveConf(), fs());
}
public void dropTables(String database, String... tables) throws IOException {
public void dropTables(String database, String... tables) throws IOException, HiveException, MetaException {
HiveSyncConfig hiveSyncConfig = hiveSyncConf();
hiveSyncConfig.databaseName = database;
for (String table : tables) {
hiveSyncConfig.tableName = table;
hiveClient(hiveSyncConfig).updateHiveSQL("drop table if exists " + table);
new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop table if exists " + table);
}
}
public void dropDatabases(String... databases) throws IOException {
public void dropDatabases(String... databases) throws IOException, HiveException, MetaException {
HiveSyncConfig hiveSyncConfig = hiveSyncConf();
for (String database : databases) {
hiveSyncConfig.databaseName = database;
hiveClient(hiveSyncConfig).updateHiveSQL("drop database if exists " + database);
new HiveQueryDDLExecutor(hiveSyncConfig, fs(), hiveConf()).runSQL("drop database if exists " + database);
}
}

View File

@@ -42,7 +42,8 @@ import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor;
import org.apache.hudi.hive.ddl.QueryBasedDDLExecutor;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
@@ -51,6 +52,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hive.service.server.HiveServer2;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -91,8 +94,9 @@ public class HiveTestUtil {
private static DateTimeFormatter dtfOut;
public static FileSystem fileSystem;
private static Set<String> createdTablesSet = new HashSet<>();
public static QueryBasedDDLExecutor ddlExecutor;
public static void setUp() throws IOException, InterruptedException {
public static void setUp() throws IOException, InterruptedException, HiveException, MetaException {
configuration = new Configuration();
if (zkServer == null) {
zkService = new ZookeeperTestService(configuration);
@@ -116,11 +120,12 @@ public class HiveTestUtil {
hiveSyncConfig.partitionFields = Collections.singletonList("datestr");
dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
ddlExecutor = new HiveQueryDDLExecutor(hiveSyncConfig, fileSystem, getHiveConf());
clear();
}
public static void clear() throws IOException {
public static void clear() throws IOException, HiveException, MetaException {
fileSystem.delete(new Path(hiveSyncConfig.basePath), true);
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
@@ -128,13 +133,12 @@ public class HiveTestUtil {
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
HoodieHiveClient client = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), fileSystem);
for (String tableName : createdTablesSet) {
client.updateHiveSQL("drop table if exists " + tableName);
ddlExecutor.runSQL("drop table if exists " + tableName);
}
createdTablesSet.clear();
client.updateHiveSQL("drop database if exists " + hiveSyncConfig.databaseName);
client.updateHiveSQL("create database " + hiveSyncConfig.databaseName);
ddlExecutor.runSQL("drop database if exists " + hiveSyncConfig.databaseName);
ddlExecutor.runSQL("create database " + hiveSyncConfig.databaseName);
}
public static HiveConf getHiveConf() {
@@ -172,6 +176,40 @@ public class HiveTestUtil {
createCommitFile(commitMetadata, instantTime);
}
public static void createCOWTableWithSchema(String instantTime, String schemaFileName)
throws IOException, URISyntaxException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(configuration, hiveSyncConfig.basePath);
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now().withTimeAtStartOfDay();
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
String partitionPath = dtfOut.print(dateTime);
Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
fileSystem.makeQualified(partPath);
fileSystem.mkdirs(partPath);
List<HoodieWriteStat> writeStats = new ArrayList<>();
String fileId = UUID.randomUUID().toString();
Path filePath = new Path(partPath.toString() + "/" + FSUtils.makeDataFileName(instantTime, "1-0-1", fileId));
Schema schema = SchemaTestUtil.getSchemaFromResource(HiveTestUtil.class, schemaFileName);
generateParquetDataWithSchema(filePath, schema);
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(fileId);
writeStat.setPath(filePath.toString());
writeStats.add(writeStat);
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema.toString());
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
}
public static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException, InterruptedException {
@@ -330,6 +368,27 @@ public class HiveTestUtil {
writer.close();
}
private static void generateParquetDataWithSchema(Path filePath, Schema schema)
throws IOException, URISyntaxException {
org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema);
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
BloomFilterTypeCode.SIMPLE.name());
HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(parquetSchema, schema, filter);
ParquetWriter writer = new ParquetWriter(filePath, writeSupport, CompressionCodecName.GZIP, 120 * 1024 * 1024,
ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetWriter.DEFAULT_WRITER_VERSION, fileSystem.getConf());
List<IndexedRecord> testRecords = SchemaTestUtil.generateTestRecordsForSchema(schema);
testRecords.forEach(s -> {
try {
writer.write(s);
} catch (IOException e) {
fail("IOException while writing test records as parquet" + e.toString());
}
});
writer.close();
}
private static HoodieLogFile generateLogData(Path parquetFilePath, boolean isLogSchemaSimple)
throws IOException, InterruptedException, URISyntaxException {
Schema schema = getTestDataSchema(isLogSchemaSimple);