[HUDI-811] Restructure test packages in hudi-common (#1644)
* [HUDI-811] Restructure test packages in hudi-common
This commit is contained in:
@@ -19,11 +19,11 @@
|
||||
package org.apache.hudi.hive;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.SchemaTestUtil;
|
||||
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
|
||||
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent.PartitionEventType;
|
||||
import org.apache.hudi.hive.testutils.TestUtil;
|
||||
import org.apache.hudi.hive.testutils.HiveTestUtil;
|
||||
import org.apache.hudi.hive.util.HiveSchemaUtil;
|
||||
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
@@ -60,17 +60,17 @@ public class TestHiveSyncTool {
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws IOException, InterruptedException {
|
||||
TestUtil.setUp();
|
||||
HiveTestUtil.setUp();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void teardown() throws IOException {
|
||||
TestUtil.clear();
|
||||
HiveTestUtil.clear();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void cleanUpClass() {
|
||||
TestUtil.shutdown();
|
||||
HiveTestUtil.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -154,51 +154,51 @@ public class TestHiveSyncTool {
|
||||
@ParameterizedTest
|
||||
@MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
|
||||
public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
String instantTime = "100";
|
||||
TestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
|
||||
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
|
||||
HoodieHiveClient hiveClient =
|
||||
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
assertFalse(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName),
|
||||
"Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially");
|
||||
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
assertFalse(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
|
||||
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
|
||||
// Lets do the sync
|
||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
assertTrue(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName),
|
||||
"Table " + TestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
|
||||
assertEquals(hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
assertTrue(hiveClient.doesTableExist(HiveTestUtil.hiveSyncConfig.tableName),
|
||||
"Table " + HiveTestUtil.hiveSyncConfig.tableName + " should exist after sync completes");
|
||||
assertEquals(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
hiveClient.getDataSchema().getColumns().size() + 1,
|
||||
"Hive Schema should match the table schema + partition field");
|
||||
assertEquals(5, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
assertEquals(5, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
"Table partitions should match the number of partitions we wrote");
|
||||
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
|
||||
"The last commit that was sycned should be updated in the TBLPROPERTIES");
|
||||
|
||||
// Adding of new partitions
|
||||
List<String> newPartition = Arrays.asList("2050/01/01");
|
||||
hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
|
||||
assertEquals(5, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
hiveClient.addPartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, Arrays.asList());
|
||||
assertEquals(5, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
"No new partition should be added");
|
||||
hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
|
||||
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
hiveClient.addPartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, newPartition);
|
||||
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
"New partition should be added");
|
||||
|
||||
// Update partitions
|
||||
hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
|
||||
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
hiveClient.updatePartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, Arrays.asList());
|
||||
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
"Partition count should remain the same");
|
||||
hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
|
||||
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
hiveClient.updatePartitionsToTable(HiveTestUtil.hiveSyncConfig.tableName, newPartition);
|
||||
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
"Partition count should remain the same");
|
||||
|
||||
// Alter partitions
|
||||
// Manually change a hive partition location to check if the sync will detect
|
||||
// it and generage a partition update event for it.
|
||||
hiveClient.updateHiveSQL("ALTER TABLE `" + TestUtil.hiveSyncConfig.tableName
|
||||
hiveClient.updateHiveSQL("ALTER TABLE `" + HiveTestUtil.hiveSyncConfig.tableName
|
||||
+ "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");
|
||||
|
||||
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
|
||||
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
List<Partition> hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
|
||||
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
|
||||
writtenPartitionsSince.add(newPartition.get(0));
|
||||
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
|
||||
@@ -206,119 +206,119 @@ public class TestHiveSyncTool {
|
||||
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
|
||||
"The one partition event must of type UPDATE");
|
||||
|
||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
// Sync should update the changed partition to correct path
|
||||
List<Partition> tablePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
|
||||
List<Partition> tablePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
|
||||
assertEquals(6, tablePartitions.size(), "The one partition we wrote should be added to hive");
|
||||
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
|
||||
"The last commit that was sycned should be 100");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("useJdbc")
|
||||
public void testSyncIncremental(boolean useJdbc) throws Exception {
|
||||
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
String commitTime1 = "100";
|
||||
TestUtil.createCOWTable(commitTime1, 5, true);
|
||||
HiveTestUtil.createCOWTable(commitTime1, 5, true);
|
||||
HoodieHiveClient hiveClient =
|
||||
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
// Lets do the sync
|
||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
assertEquals(5, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
assertEquals(5, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
"Table partitions should match the number of partitions we wrote");
|
||||
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||
assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
|
||||
"The last commit that was sycned should be updated in the TBLPROPERTIES");
|
||||
|
||||
// Now lets create more parititions and these are the only ones which needs to be synced
|
||||
DateTime dateTime = DateTime.now().plusDays(6);
|
||||
String commitTime2 = "101";
|
||||
TestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2);
|
||||
HiveTestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2);
|
||||
|
||||
// Lets do the sync
|
||||
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(commitTime1));
|
||||
assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
|
||||
List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
|
||||
List<Partition> hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
|
||||
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
|
||||
assertEquals(1, partitionEvents.size(), "There should be only one paritition event");
|
||||
assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
|
||||
|
||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
// Sync should add the one partition
|
||||
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
"The one partition we wrote should be added to hive");
|
||||
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
|
||||
"The last commit that was sycned should be 101");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("useJdbc")
|
||||
public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception {
|
||||
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
String commitTime1 = "100";
|
||||
TestUtil.createCOWTable(commitTime1, 5, true);
|
||||
HiveTestUtil.createCOWTable(commitTime1, 5, true);
|
||||
HoodieHiveClient hiveClient =
|
||||
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
// Lets do the sync
|
||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
|
||||
int fields = hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size();
|
||||
int fields = hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size();
|
||||
|
||||
// Now lets create more parititions and these are the only ones which needs to be synced
|
||||
DateTime dateTime = DateTime.now().plusDays(6);
|
||||
String commitTime2 = "101";
|
||||
TestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
|
||||
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
|
||||
|
||||
// Lets do the sync
|
||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
|
||||
assertEquals(fields + 3, hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
assertEquals(fields + 3, hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
"Hive Schema has evolved and should not be 3 more field");
|
||||
assertEquals("BIGINT", hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).get("favorite_number"),
|
||||
assertEquals("BIGINT", hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).get("favorite_number"),
|
||||
"Hive Schema has evolved - Field favorite_number has evolved from int to long");
|
||||
assertTrue(hiveClient.getTableSchema(TestUtil.hiveSyncConfig.tableName).containsKey("favorite_movie"),
|
||||
assertTrue(hiveClient.getTableSchema(HiveTestUtil.hiveSyncConfig.tableName).containsKey("favorite_movie"),
|
||||
"Hive Schema has evolved - Field favorite_movie was added");
|
||||
|
||||
// Sync should add the one partition
|
||||
assertEquals(6, hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size(),
|
||||
assertEquals(6, hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName).size(),
|
||||
"The one partition we wrote should be added to hive");
|
||||
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get(),
|
||||
assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(HiveTestUtil.hiveSyncConfig.tableName).get(),
|
||||
"The last commit that was sycned should be 101");
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("useJdbcAndSchemaFromCommitMetadata")
|
||||
public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
String instantTime = "100";
|
||||
String deltaCommitTime = "101";
|
||||
TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
|
||||
useSchemaFromCommitMetadata);
|
||||
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
|
||||
useSchemaFromCommitMetadata);
|
||||
|
||||
String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
|
||||
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
assertFalse(hiveClient.doesTableExist(roTableName), "Table " + TestUtil.hiveSyncConfig.tableName + " should not exist initially");
|
||||
String roTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
|
||||
HoodieHiveClient hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
assertFalse(hiveClient.doesTableExist(roTableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + " should not exist initially");
|
||||
// Lets do the sync
|
||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
|
||||
assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes");
|
||||
|
||||
if (useSchemaFromCommitMetadata) {
|
||||
assertEquals(hiveClient.getTableSchema(roTableName).size(),
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
"Hive Schema should match the table schema + partition field");
|
||||
} else {
|
||||
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
|
||||
assertEquals(hiveClient.getTableSchema(roTableName).size(),
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
|
||||
"Hive Schema should match the table schema + partition field");
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(),
|
||||
"Hive Schema should match the table schema + partition field");
|
||||
}
|
||||
|
||||
assertEquals(5, hiveClient.scanTablePartitions(roTableName).size(),
|
||||
@@ -331,24 +331,24 @@ public class TestHiveSyncTool {
|
||||
String commitTime2 = "102";
|
||||
String deltaCommitTime2 = "103";
|
||||
|
||||
TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
|
||||
TestUtil.addMORPartitions(1, true, false,
|
||||
HiveTestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
|
||||
HiveTestUtil.addMORPartitions(1, true, false,
|
||||
useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
|
||||
// Lets do the sync
|
||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
|
||||
if (useSchemaFromCommitMetadata) {
|
||||
assertEquals(hiveClient.getTableSchema(roTableName).size(),
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
"Hive Schema should match the evolved table schema + partition field");
|
||||
} else {
|
||||
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
|
||||
assertEquals(hiveClient.getTableSchema(roTableName).size(),
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
|
||||
"Hive Schema should match the evolved table schema + partition field");
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(),
|
||||
"Hive Schema should match the evolved table schema + partition field");
|
||||
}
|
||||
// Sync should add the one partition
|
||||
assertEquals(6, hiveClient.scanTablePartitions(roTableName).size(),
|
||||
@@ -360,36 +360,36 @@ public class TestHiveSyncTool {
|
||||
@ParameterizedTest
|
||||
@MethodSource("useJdbcAndSchemaFromCommitMetadata")
|
||||
public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
|
||||
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
String instantTime = "100";
|
||||
String deltaCommitTime = "101";
|
||||
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||
TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata);
|
||||
String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata);
|
||||
HoodieHiveClient hiveClientRT =
|
||||
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
|
||||
assertFalse(hiveClientRT.doesTableExist(snapshotTableName),
|
||||
"Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||
"Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||
+ " should not exist initially");
|
||||
|
||||
// Lets do the sync
|
||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
|
||||
assertTrue(hiveClientRT.doesTableExist(snapshotTableName),
|
||||
"Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||
"Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||
+ " should exist after sync completes");
|
||||
|
||||
if (useSchemaFromCommitMetadata) {
|
||||
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
"Hive Schema should match the table schema + partition field");
|
||||
} else {
|
||||
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
|
||||
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
|
||||
"Hive Schema should match the table schema + partition field");
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(),
|
||||
"Hive Schema should match the table schema + partition field");
|
||||
}
|
||||
|
||||
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
|
||||
@@ -402,23 +402,23 @@ public class TestHiveSyncTool {
|
||||
String commitTime2 = "102";
|
||||
String deltaCommitTime2 = "103";
|
||||
|
||||
TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
|
||||
TestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
|
||||
HiveTestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
|
||||
HiveTestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
|
||||
// Lets do the sync
|
||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
hiveClientRT = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
|
||||
if (useSchemaFromCommitMetadata) {
|
||||
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
"Hive Schema should match the evolved table schema + partition field");
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
"Hive Schema should match the evolved table schema + partition field");
|
||||
} else {
|
||||
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
|
||||
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
|
||||
"Hive Schema should match the evolved table schema + partition field");
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size(),
|
||||
"Hive Schema should match the evolved table schema + partition field");
|
||||
}
|
||||
// Sync should add the one partition
|
||||
assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
|
||||
@@ -430,21 +430,21 @@ public class TestHiveSyncTool {
|
||||
@ParameterizedTest
|
||||
@MethodSource("useJdbc")
|
||||
public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
|
||||
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
String instantTime = "100";
|
||||
TestUtil.createCOWTable(instantTime, 5, true);
|
||||
HiveTestUtil.createCOWTable(instantTime, 5, true);
|
||||
|
||||
HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(TestUtil.hiveSyncConfig);
|
||||
HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig);
|
||||
hiveSyncConfig.partitionValueExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName();
|
||||
hiveSyncConfig.tableName = "multi_part_key";
|
||||
hiveSyncConfig.partitionFields = Arrays.asList("year", "month", "day");
|
||||
TestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
|
||||
HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
|
||||
|
||||
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
|
||||
"Table " + hiveSyncConfig.tableName + " should not exist initially");
|
||||
// Lets do the sync
|
||||
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
|
||||
"Table " + hiveSyncConfig.tableName + " should exist after sync completes");
|
||||
@@ -460,27 +460,27 @@ public class TestHiveSyncTool {
|
||||
@ParameterizedTest
|
||||
@MethodSource("useJdbc")
|
||||
public void testReadSchemaForMOR(boolean useJdbc) throws Exception {
|
||||
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
HiveTestUtil.hiveSyncConfig.useJdbc = useJdbc;
|
||||
String commitTime = "100";
|
||||
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||
TestUtil.createMORTable(commitTime, "", 5, false, true);
|
||||
String snapshotTableName = HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
|
||||
HiveTestUtil.createMORTable(commitTime, "", 5, false, true);
|
||||
HoodieHiveClient hiveClientRT =
|
||||
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
|
||||
assertFalse(hiveClientRT.doesTableExist(snapshotTableName), "Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||
assertFalse(hiveClientRT.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||
+ " should not exist initially");
|
||||
|
||||
// Lets do the sync
|
||||
HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
HiveSyncTool tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
|
||||
assertTrue(hiveClientRT.doesTableExist(snapshotTableName), "Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||
assertTrue(hiveClientRT.doesTableExist(snapshotTableName), "Table " + HiveTestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
|
||||
+ " should exist after sync completes");
|
||||
|
||||
// Schema being read from compacted base files
|
||||
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
"Hive Schema should match the table schema + partition field");
|
||||
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote");
|
||||
|
||||
@@ -489,16 +489,16 @@ public class TestHiveSyncTool {
|
||||
String commitTime2 = "102";
|
||||
String deltaCommitTime2 = "103";
|
||||
|
||||
TestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2, deltaCommitTime2);
|
||||
HiveTestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2, deltaCommitTime2);
|
||||
// Lets do the sync
|
||||
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
tool = new HiveSyncTool(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
tool.syncHoodieTable();
|
||||
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
|
||||
hiveClientRT = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
|
||||
|
||||
// Schema being read from the log files
|
||||
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
|
||||
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
|
||||
"Hive Schema should match the evolved table schema + partition field");
|
||||
// Sync should add the one partition
|
||||
assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "The 1 partition we wrote should be added to hive");
|
||||
|
||||
@@ -18,7 +18,7 @@
|
||||
|
||||
package org.apache.hudi.hive.testutils;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieTestUtils;
|
||||
import org.apache.hudi.common.testutils.HoodieTestUtils;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
@@ -23,8 +23,6 @@ import org.apache.hudi.common.bloom.BloomFilter;
|
||||
import org.apache.hudi.common.bloom.BloomFilterFactory;
|
||||
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.minicluster.HdfsTestService;
|
||||
import org.apache.hudi.common.minicluster.ZookeeperTestService;
|
||||
import org.apache.hudi.common.model.HoodieAvroPayload;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
@@ -39,8 +37,10 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
|
||||
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.testutils.SchemaTestUtil;
|
||||
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
|
||||
import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService;
|
||||
import org.apache.hudi.common.util.FileIOUtils;
|
||||
import org.apache.hudi.common.util.SchemaTestUtil;
|
||||
import org.apache.hudi.hive.HiveSyncConfig;
|
||||
import org.apache.hudi.hive.HiveSyncTool;
|
||||
import org.apache.hudi.hive.HoodieHiveClient;
|
||||
@@ -80,7 +80,7 @@ import java.util.UUID;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
||||
@SuppressWarnings("SameParameterValue")
|
||||
public class TestUtil {
|
||||
public class HiveTestUtil {
|
||||
|
||||
private static MiniDFSCluster dfsCluster;
|
||||
private static ZooKeeperServer zkServer;
|
||||
Reference in New Issue
Block a user