Add MOR integration testing
This commit is contained in:
committed by
Balaji Varadarajan
parent
b6057c5e0e
commit
bd77dc792c
@@ -19,6 +19,7 @@
|
|||||||
package org.apache.hudi.integ;
|
package org.apache.hudi.integ;
|
||||||
|
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.common.model.HoodieTableType;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@@ -39,8 +40,8 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception {
|
public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception {
|
||||||
String hiveTableName = "docker_hoodie_single_partition_key_cow_test";
|
String hiveTableName = "docker_hoodie_single_partition_key_cow_test";
|
||||||
testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.SINGLE_KEY_PARTITIONED);
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED);
|
||||||
executeHiveCommand("drop table if exists " + hiveTableName);
|
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -51,8 +52,8 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception {
|
public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception {
|
||||||
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test";
|
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test";
|
||||||
testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.MULTI_KEYS_PARTITIONED);
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.MULTI_KEYS_PARTITIONED);
|
||||||
executeHiveCommand("drop table if exists " + hiveTableName);
|
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@@ -63,29 +64,65 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception {
|
public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception {
|
||||||
String hiveTableName = "docker_hoodie_non_partition_key_cow_test";
|
String hiveTableName = "docker_hoodie_non_partition_key_cow_test";
|
||||||
testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.NON_PARTITIONED);
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.NON_PARTITIONED);
|
||||||
executeHiveCommand("drop table if exists " + hiveTableName);
|
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set
|
||||||
|
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
|
||||||
|
* console.
|
||||||
|
*/
|
||||||
|
public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception {
|
||||||
|
String hiveTableName = "docker_hoodie_single_partition_key_mor_test";
|
||||||
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.SINGLE_KEY_PARTITIONED);
|
||||||
|
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with multiple partition-keys
|
||||||
|
* data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
|
||||||
|
* in hive console.
|
||||||
|
*/
|
||||||
|
public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable() throws Exception {
|
||||||
|
String hiveTableName = "docker_hoodie_multi_partition_key_mor_test";
|
||||||
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.MULTI_KEYS_PARTITIONED);
|
||||||
|
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* A basic integration test that runs HoodieJavaApp to create a sample non-partitioned MOR Hoodie data-set and
|
||||||
|
* performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
|
||||||
|
* console.
|
||||||
|
*/
|
||||||
|
public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception {
|
||||||
|
String hiveTableName = "docker_hoodie_non_partition_key_mor_test";
|
||||||
|
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.NON_PARTITIONED);
|
||||||
|
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie data-set and performs upserts on it.
|
* A basic integration test that runs HoodieJavaApp to create a sample Hoodie data-set and performs upserts on it.
|
||||||
* Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add
|
* Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add
|
||||||
* spark-shell test-case
|
* spark-shell test-case
|
||||||
*/
|
*/
|
||||||
public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType partitionType) throws Exception {
|
public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType)
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
String hdfsPath = "/" + hiveTableName;
|
String hdfsPath = "/" + hiveTableName;
|
||||||
String hdfsUrl = "hdfs://namenode" + hdfsPath;
|
String hdfsUrl = "hdfs://namenode" + hdfsPath;
|
||||||
|
|
||||||
// Drop Table if it exists
|
// Drop Table if it exists
|
||||||
String hiveDropCmd = "drop table if exists " + hiveTableName;
|
|
||||||
try {
|
try {
|
||||||
executeHiveCommand(hiveDropCmd);
|
dropHiveTables(hiveTableName, tableType);
|
||||||
} catch (AssertionError ex) {
|
} catch (AssertionError ex) {
|
||||||
// In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up
|
// In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up
|
||||||
// Workaround to sleep for 5 secs and retry
|
// Workaround to sleep for 5 secs and retry
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
executeHiveCommand(hiveDropCmd);
|
dropHiveTables(hiveTableName, tableType);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure table does not exist
|
// Ensure table does not exist
|
||||||
@@ -96,13 +133,13 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
String cmd;
|
String cmd;
|
||||||
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
|
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
|
||||||
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
||||||
+ " --hive-table " + hiveTableName;
|
+ " --table-type " + tableType + " --hive-table " + hiveTableName;
|
||||||
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
|
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
|
||||||
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
||||||
+ " --hive-table " + hiveTableName + " --use-multi-partition-keys";
|
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys";
|
||||||
} else {
|
} else {
|
||||||
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
|
||||||
+ " --hive-table " + hiveTableName + " --non-partitioned";
|
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
|
||||||
}
|
}
|
||||||
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
|
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
|
||||||
|
|
||||||
@@ -115,6 +152,13 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
Assert.assertEquals("Expecting 100 rows to be present in the new table", 100,
|
Assert.assertEquals("Expecting 100 rows to be present in the new table", 100,
|
||||||
Integer.parseInt(stdOutErr.getLeft().trim()));
|
Integer.parseInt(stdOutErr.getLeft().trim()));
|
||||||
|
|
||||||
|
// If is MOR table, ensure realtime table row count is 100 (without duplicates)
|
||||||
|
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
|
||||||
|
stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName + "_rt");
|
||||||
|
Assert.assertEquals("Expecting 100 rows to be present in the realtime table,", 100,
|
||||||
|
Integer.parseInt(stdOutErr.getLeft().trim()));
|
||||||
|
}
|
||||||
|
|
||||||
// Make the HDFS dataset non-hoodie and run the same query
|
// Make the HDFS dataset non-hoodie and run the same query
|
||||||
// Checks for interoperability with non-hoodie tables
|
// Checks for interoperability with non-hoodie tables
|
||||||
|
|
||||||
@@ -126,4 +170,11 @@ public class ITTestHoodieSanity extends ITTestBase {
|
|||||||
Assert.assertEquals("Expecting 200 rows to be present in the new table", 200,
|
Assert.assertEquals("Expecting 200 rows to be present in the new table", 200,
|
||||||
Integer.parseInt(stdOutErr.getLeft().trim()));
|
Integer.parseInt(stdOutErr.getLeft().trim()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void dropHiveTables(String hiveTableName, String tableType) throws Exception {
|
||||||
|
executeHiveCommand("drop table if exists " + hiveTableName);
|
||||||
|
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
|
||||||
|
executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user