1
0

[HUDI-3689] Fix UT failures in TestHoodieDeltaStreamer (#5120)

This commit is contained in:
Raymond Xu
2022-03-24 09:10:33 -07:00
committed by GitHub
parent b14706502b
commit 686da41696
5 changed files with 21 additions and 251 deletions

View File

@@ -1,133 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.hive.testutils;
import org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig;
import org.apache.hudi.hive.replication.HiveSyncGlobalCommitTool;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_BASE_PATH;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SERVER_JDBC_URLS;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.LOCAL_HIVE_SITE_URI;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_BASE_PATH;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SERVER_JDBC_URLS;
import static org.apache.hudi.hive.replication.HiveSyncGlobalCommitConfig.REMOTE_HIVE_SITE_URI;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHiveSyncGlobalCommitTool {
TestCluster localCluster;
TestCluster remoteCluster;
private static String DB_NAME = "foo";
private static String TBL_NAME = "bar";
private HiveSyncGlobalCommitConfig getGlobalCommitConfig(
String commitTime, String dbName, String tblName) throws Exception {
HiveSyncGlobalCommitConfig config = new HiveSyncGlobalCommitConfig();
config.properties.setProperty(LOCAL_HIVE_SITE_URI, localCluster.getHiveSiteXmlLocation());
config.properties.setProperty(REMOTE_HIVE_SITE_URI, remoteCluster.getHiveSiteXmlLocation());
config.properties.setProperty(LOCAL_HIVE_SERVER_JDBC_URLS, localCluster.getHiveJdBcUrl());
config.properties.setProperty(REMOTE_HIVE_SERVER_JDBC_URLS, remoteCluster.getHiveJdBcUrl());
config.properties.setProperty(LOCAL_BASE_PATH, localCluster.tablePath(dbName, tblName));
config.properties.setProperty(REMOTE_BASE_PATH, remoteCluster.tablePath(dbName, tblName));
config.globallyReplicatedTimeStamp = commitTime;
config.hiveUser = System.getProperty("user.name");
config.hivePass = "";
config.databaseName = dbName;
config.tableName = tblName;
config.basePath = localCluster.tablePath(dbName, tblName);
config.assumeDatePartitioning = true;
config.usePreApacheInputFormat = false;
config.partitionFields = Collections.singletonList("datestr");
return config;
}
private void compareEqualLastReplicatedTimeStamp(HiveSyncGlobalCommitConfig config) throws Exception {
assertEquals(localCluster.getHMSClient().getTable(config.databaseName, config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
remoteCluster.getHMSClient().getTable(config.databaseName, config.tableName).getParameters().get(GLOBALLY_CONSISTENT_READ_TIMESTAMP),
"compare replicated timestamps");
}
@BeforeEach
public void setUp() throws Exception {
localCluster = new TestCluster();
localCluster.setup();
remoteCluster = new TestCluster();
remoteCluster.setup();
localCluster.forceCreateDb(DB_NAME);
remoteCluster.forceCreateDb(DB_NAME);
localCluster.dfsCluster.getFileSystem().delete(new Path(localCluster.tablePath(DB_NAME, TBL_NAME)), true);
remoteCluster.dfsCluster.getFileSystem().delete(new Path(remoteCluster.tablePath(DB_NAME, TBL_NAME)), true);
}
@AfterEach
public void clear() throws Exception {
localCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
remoteCluster.getHMSClient().dropTable(DB_NAME, TBL_NAME);
localCluster.shutDown();
remoteCluster.shutDown();
}
@Test
public void testBasicGlobalCommit() throws Exception {
String commitTime = "100";
localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
// simulate drs
remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
assertTrue(tool.commit());
compareEqualLastReplicatedTimeStamp(config);
}
@Test
public void testBasicRollback() throws Exception {
String commitTime = "100";
localCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
// simulate drs
remoteCluster.createCOWTable(commitTime, 5, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitConfig config = getGlobalCommitConfig(commitTime, DB_NAME, TBL_NAME);
HiveSyncGlobalCommitTool tool = new HiveSyncGlobalCommitTool(config);
assertFalse(localCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
// stop the remote cluster hive server to simulate cluster going down
remoteCluster.stopHiveServer2();
assertFalse(tool.commit());
assertEquals(commitTime, localCluster.getHMSClient()
.getTable(config.databaseName, config.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
assertTrue(tool.rollback()); // do a rollback
assertNotEquals(commitTime, localCluster.getHMSClient()
.getTable(config.databaseName, config.tableName).getParameters()
.get(GLOBALLY_CONSISTENT_READ_TIMESTAMP));
assertFalse(remoteCluster.getHMSClient().tableExists(DB_NAME, TBL_NAME));
remoteCluster.startHiveServer2();
}
}

View File

@@ -1,97 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestUtilHelpers {
public static class TransformerFoo implements Transformer {
@Override
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
return null;
}
}
public static class TransformerBar implements Transformer {
@Override
public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset, TypedProperties properties) {
return null;
}
}
@Nested
public class TestCreateTransformer {
@Test
public void testCreateTransformerNotPresent() throws IOException {
assertFalse(UtilHelpers.createTransformer(null).isPresent());
}
@Test
public void testCreateTransformerLoadOneClass() throws IOException {
Transformer transformer = UtilHelpers.createTransformer(Collections.singletonList(TransformerFoo.class.getName())).get();
assertTrue(transformer instanceof ChainedTransformer);
List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames();
assertEquals(1, transformerNames.size());
assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
}
@Test
public void testCreateTransformerLoadMultipleClasses() throws IOException {
List<String> classNames = Arrays.asList(TransformerFoo.class.getName(), TransformerBar.class.getName());
Transformer transformer = UtilHelpers.createTransformer(classNames).get();
assertTrue(transformer instanceof ChainedTransformer);
List<String> transformerNames = ((ChainedTransformer) transformer).getTransformersNames();
assertEquals(2, transformerNames.size());
assertEquals(TransformerFoo.class.getName(), transformerNames.get(0));
assertEquals(TransformerBar.class.getName(), transformerNames.get(1));
}
@Test
public void testCreateTransformerThrowsException() throws IOException {
Exception e = assertThrows(IOException.class, () -> {
UtilHelpers.createTransformer(Arrays.asList("foo", "bar"));
});
assertEquals("Could not load transformer class(es) [foo, bar]", e.getMessage());
}
}
}

View File

@@ -1378,7 +1378,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
assertEquals(1000, c);
}
private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) throws IOException {
private static void prepareJsonKafkaDFSFiles(int numRecords, boolean createTopic, String topicName) {
if (createTopic) {
try {
testUtils.createTopic(topicName, 2);
@@ -1491,7 +1491,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
props.setProperty("include", "base.properties");
props.setProperty("hoodie.embed.timeline.server", "false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "partition_path");
props.setProperty("hoodie.datasource.write.partitionpath.field", "");
props.setProperty("hoodie.deltastreamer.source.dfs.root", JSON_KAFKA_SOURCE_ROOT);
props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type", kafkaCheckpointType);
@@ -1515,15 +1515,15 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
prepareParquetDFSFiles(parquetRecords, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, true, HoodieTestDataGenerator.TRIP_SCHEMA, HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
prepareParquetDFSSource(true, false, "source_uber.avsc", "target_uber.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false);
PARQUET_SOURCE_ROOT, false, "");
// delta streamer w/ parquet source
String tableBasePath = dfsBasePath + "/test_dfs_to_kafka" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ParquetDFSSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_PARQUET, false,
Collections.emptyList(), PROPS_FILENAME_TEST_PARQUET, false,
false, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(parquetRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertRecordCount(parquetRecords, tableBasePath, sqlContext);
deltaStreamer.shutdownGracefully();
// prep json kafka source
@@ -1533,18 +1533,18 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
// delta streamer w/ json kafka source
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
// if auto reset value is set to LATEST, this all kafka records so far may not be synced.
int totalExpectedRecords = parquetRecords + ((autoResetToLatest) ? 0 : JSON_KAFKA_NUM_RECORDS);
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
// verify 2nd batch to test LATEST auto reset value.
prepareJsonKafkaDFSFiles(20, false, topicName);
totalExpectedRecords += 20;
deltaStreamer.sync();
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertRecordCount(totalExpectedRecords, tableBasePath, sqlContext);
testNum++;
}
@@ -1556,17 +1556,17 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null, "timestamp", null), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
int totalRecords = JSON_KAFKA_NUM_RECORDS;
int records = 10;
totalRecords += records;
prepareJsonKafkaDFSFiles(records, false, topicName);
deltaStreamer.sync();
TestHelpers.assertRecordCount(totalRecords, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertRecordCount(totalRecords, tableBasePath, sqlContext);
}
@Test
@@ -1578,20 +1578,20 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase {
String tableBasePath = dfsBasePath + "/test_json_kafka_table" + testNum;
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null,
null, "timestamp", String.valueOf(System.currentTimeMillis())), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS, tableBasePath, sqlContext);
prepareJsonKafkaDFSFiles(JSON_KAFKA_NUM_RECORDS, false, topicName);
deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, JsonKafkaSource.class.getName(),
Collections.EMPTY_LIST, PROPS_FILENAME_TEST_JSON_KAFKA, false,
Collections.emptyList(), PROPS_FILENAME_TEST_JSON_KAFKA, false,
true, 100000, false, null, null,
"timestamp", String.valueOf(System.currentTimeMillis())), jsc);
deltaStreamer.sync();
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath + "/*/*.parquet", sqlContext);
TestHelpers.assertRecordCount(JSON_KAFKA_NUM_RECORDS * 2, tableBasePath, sqlContext);
}
@Test

View File

@@ -161,8 +161,8 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
String targetBasePath2 = executionContexts.get(1).getConfig().targetBasePath;
streamer.sync();
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1, sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2, sqlContext);
//insert updates for already existing records in kafka topics
testUtils.sendMessages(topicName1, Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, HoodieTestDataGenerator.TRIP_SCHEMA)));
@@ -177,8 +177,8 @@ public class TestHoodieMultiTableDeltaStreamer extends HoodieDeltaStreamerTestBa
assertTrue(streamer.getFailedTables().isEmpty());
//assert the record count matches now
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + "/*/*.parquet", sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 + "/*/*.parquet", sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1, sqlContext);
TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2, sqlContext);
testNum++;
}

View File

@@ -73,8 +73,8 @@
<properties>
<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version>
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
<maven-failsafe-plugin.version>3.0.0-M4</maven-failsafe-plugin.version>
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<maven-failsafe-plugin.version>3.0.0-M5</maven-failsafe-plugin.version>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
<maven-javadoc-plugin.version>3.1.1</maven-javadoc-plugin.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>