1
0

[HUDI-1950] Move TestHiveMetastoreBasedLockProvider to functional (#3043)

HiveTestUtil static setup mini servers caused connection refused issue in Azure CI environment, as TestHiveSyncTool and TestHiveMetastoreBasedLockProvider share the same test facilities. Moving TestHiveMetastoreBasedLockProvider (the easier one) to functional test with a separate and improved mini server setup resolved the issue.

Also cleaned up dfs cluster from HiveTestUtil.

The next step is to move TestHiveSyncTool to functional as well.
This commit is contained in:
Raymond Xu
2021-06-07 15:38:59 -07:00
committed by GitHub
parent f3d7b49bfe
commit 441076b2cc
7 changed files with 248 additions and 83 deletions

View File

@@ -178,6 +178,24 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-suite-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-commons</artifactId>
<scope>test</scope>
</dependency>
<!-- Hadoop - Test -->
<dependency>
<groupId>org.apache.hadoop</groupId>

View File

@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.NetworkTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.hive.util.ConfigUtils;
@@ -245,7 +246,6 @@ public class TestHiveSyncTool {
hiveClient = new HoodieHiveClient(HiveTestUtil.hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
List<Partition> hivePartitions = hiveClient.scanTablePartitions(HiveTestUtil.hiveSyncConfig.tableName);
List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
writtenPartitionsSince.add(newPartition.get(0));
List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
assertEquals(1, partitionEvents.size(), "There should be only one partition event");
assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType,
@@ -687,7 +687,8 @@ public class TestHiveSyncTool {
HiveSyncConfig syncToolConfig = HiveSyncConfig.copy(HiveTestUtil.hiveSyncConfig);
syncToolConfig.ignoreExceptions = true;
syncToolConfig.jdbcUrl = HiveTestUtil.hiveSyncConfig.jdbcUrl.replace("9999","9031");
syncToolConfig.jdbcUrl = HiveTestUtil.hiveSyncConfig.jdbcUrl
.replace(String.valueOf(HiveTestUtil.hiveTestService.getHiveServerPort()), String.valueOf(NetworkTestUtils.nextFreePort()));
HiveSyncTool tool = new HiveSyncTool(syncToolConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();

View File

@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.hive.functional;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.platform.suite.api.IncludeTags;
import org.junit.platform.suite.api.SelectPackages;
import org.junit.runner.RunWith;
@RunWith(JUnitPlatform.class)
@SelectPackages("org.apache.hudi.hive.functional")
@IncludeTags("functional")
public class HiveSyncFunctionalTestSuite {
}

View File

@@ -7,32 +7,32 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.hive;
package org.apache.hudi.hive.functional;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hive.HiveMetastoreBasedLockProvider;
import org.apache.hudi.hive.testutils.HiveSyncFunctionalTestHarness;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
@@ -54,39 +54,33 @@ import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT
* /metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java#L2892
* Unless this is set, we cannot use HiveMetastore server in tests for locking use-cases.
*/
public class TestHiveMetastoreBasedLockProvider {
@Tag("functional")
public class TestHiveMetastoreBasedLockProvider extends HiveSyncFunctionalTestHarness {
private static Connection connection;
private static LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, "testdb");
private static LockConfiguration lockConfiguration;
private static final String TEST_DB_NAME = "testdb";
private static final String TEST_TABLE_NAME = "testtable";
private LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, TEST_DB_NAME);
private LockConfiguration lockConfiguration;
@BeforeAll
public static void init() throws Exception {
HiveTestUtil.setUp();
createHiveConnection();
connection.createStatement().execute("create database if not exists testdb");
@BeforeEach
public void init() throws Exception {
TypedProperties properties = new TypedProperties();
properties.setProperty(HIVE_DATABASE_NAME_PROP, "testdb");
properties.setProperty(HIVE_TABLE_NAME_PROP, "testtable");
properties.setProperty(HIVE_DATABASE_NAME_PROP, TEST_DB_NAME);
properties.setProperty(HIVE_TABLE_NAME_PROP, TEST_TABLE_NAME);
properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP, DEFAULT_LOCK_ACQUIRE_NUM_RETRIES);
properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP, DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS);
properties.setProperty(ZK_CONNECT_URL_PROP, HiveTestUtil.getZkService().connectString());
properties.setProperty(ZK_PORT_PROP, HiveTestUtil.getHiveConf().get("hive.zookeeper.client.port"));
properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP, HiveTestUtil.getHiveConf().get("hive.zookeeper.session.timeout"));
properties.setProperty(ZK_CONNECT_URL_PROP, zkService().connectString());
properties.setProperty(ZK_PORT_PROP, hiveConf().get("hive.zookeeper.client.port"));
properties.setProperty(ZK_SESSION_TIMEOUT_MS_PROP, hiveConf().get("hive.zookeeper.session.timeout"));
properties.setProperty(ZK_CONNECTION_TIMEOUT_MS_PROP, String.valueOf(DEFAULT_ZK_CONNECTION_TIMEOUT_MS));
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP, String.valueOf(1000));
lockConfiguration = new LockConfiguration(properties);
lockComponent.setTablename("testtable");
}
@AfterAll
public static void cleanUpClass() {
HiveTestUtil.shutdown();
lockComponent.setTablename(TEST_TABLE_NAME);
}
@Test
public void testAcquireLock() throws Exception {
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent));
@@ -106,7 +100,7 @@ public class TestHiveMetastoreBasedLockProvider {
@Test
public void testUnlock() throws Exception {
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent));
@@ -119,7 +113,7 @@ public class TestHiveMetastoreBasedLockProvider {
@Test
public void testReentrantLock() throws Exception {
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent));
@@ -135,24 +129,9 @@ public class TestHiveMetastoreBasedLockProvider {
@Test
public void testUnlockWithoutLock() {
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, hiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
lockProvider.unlock();
}
private static void createHiveConnection() {
if (connection == null) {
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
} catch (ClassNotFoundException e) {
throw new RuntimeException();
}
try {
connection = DriverManager.getConnection("jdbc:hive2://127.0.0.1:9999/");
} catch (SQLException e) {
throw new HoodieHiveSyncException("Cannot create hive connection ", e);
}
}
}
}

View File

@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.hive.testutils;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.nio.file.Files;
import java.time.Instant;
import java.util.Collections;
public class HiveSyncFunctionalTestHarness {
private static transient Configuration hadoopConf;
private static transient HiveTestService hiveTestService;
private static transient ZookeeperTestService zookeeperTestService;
/**
* An indicator of the initialization status.
*/
protected boolean initialized = false;
@TempDir
protected java.nio.file.Path tempDir;
public String basePath() {
return tempDir.toAbsolutePath().toString();
}
public Configuration hadoopConf() {
return hadoopConf;
}
public FileSystem fs() throws IOException {
return FileSystem.get(hadoopConf);
}
public HiveTestService hiveService() {
return hiveTestService;
}
public HiveConf hiveConf() {
return hiveTestService.getHiveServer().getHiveConf();
}
public ZookeeperTestService zkService() {
return zookeeperTestService;
}
public HiveSyncConfig hiveSyncConf() throws IOException {
HiveSyncConfig conf = new HiveSyncConfig();
conf.jdbcUrl = hiveTestService.getJdbcHive2Url();
conf.hiveUser = "";
conf.hivePass = "";
conf.databaseName = "hivesynctestdb";
conf.tableName = "hivesynctesttable";
conf.basePath = Files.createDirectories(tempDir.resolve("hivesynctestcase-" + Instant.now().toEpochMilli())).toUri().toString();
conf.assumeDatePartitioning = true;
conf.usePreApacheInputFormat = false;
conf.partitionFields = Collections.singletonList("datestr");
return conf;
}
public HoodieHiveClient hiveClient(HiveSyncConfig hiveSyncConfig) throws IOException {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(HoodieTableType.COPY_ON_WRITE)
.setTableName(hiveSyncConfig.tableName)
.setPayloadClass(HoodieAvroPayload.class)
.initTable(hadoopConf, hiveSyncConfig.basePath);
return new HoodieHiveClient(hiveSyncConfig, hiveConf(), fs());
}
public void dropTables(String database, String... tables) throws IOException {
HiveSyncConfig hiveSyncConfig = hiveSyncConf();
hiveSyncConfig.databaseName = database;
for (String table : tables) {
hiveSyncConfig.tableName = table;
hiveClient(hiveSyncConfig).updateHiveSQL("drop table if exists " + table);
}
}
public void dropDatabases(String... databases) throws IOException {
HiveSyncConfig hiveSyncConfig = hiveSyncConf();
for (String database : databases) {
hiveSyncConfig.databaseName = database;
hiveClient(hiveSyncConfig).updateHiveSQL("drop database if exists " + database);
}
}
@BeforeEach
public synchronized void runBeforeEach() throws IOException, InterruptedException {
initialized = hiveTestService != null && zookeeperTestService != null;
if (!initialized) {
hadoopConf = new Configuration();
zookeeperTestService = new ZookeeperTestService(hadoopConf);
zookeeperTestService.start();
hiveTestService = new HiveTestService(hadoopConf);
hiveTestService.start();
}
}
@AfterAll
public static synchronized void cleanUpAfterAll() {
if (hiveTestService != null) {
hiveTestService.stop();
hiveTestService = null;
}
if (zookeeperTestService != null) {
zookeeperTestService.stop();
zookeeperTestService = null;
}
if (hadoopConf != null) {
hadoopConf.clear();
hadoopConf = null;
}
}
}

View File

@@ -79,8 +79,9 @@ public class HiveTestService {
private TServer tServer;
private HiveServer2 hiveServer;
public HiveTestService(Configuration configuration) throws IOException {
public HiveTestService(Configuration hadoopConf) throws IOException {
this.workDir = Files.createTempDirectory(System.currentTimeMillis() + "-").toFile().getAbsolutePath();
this.hadoopConf = hadoopConf;
}
public Configuration getHadoopConf() {
@@ -152,6 +153,14 @@ public class HiveTestService {
return hiveServer;
}
public int getHiveServerPort() {
return serverPort;
}
public String getJdbcHive2Url() {
return String.format("jdbc:hive2://%s:%s/default", bindIP, serverPort);
}
public HiveConf configureHive(Configuration conf, String localHiveLocation) throws IOException {
conf.set("hive.metastore.local", "false");
conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + ":" + metastorePort);

View File

@@ -38,7 +38,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.common.testutils.minicluster.ZookeeperTestService;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.hive.HiveSyncConfig;
@@ -51,7 +50,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.server.HiveServer2;
import org.apache.parquet.avro.AvroSchemaConverter;
@@ -67,6 +65,8 @@ import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -82,10 +82,9 @@ import static org.junit.jupiter.api.Assertions.fail;
@SuppressWarnings("SameParameterValue")
public class HiveTestUtil {
private static MiniDFSCluster dfsCluster;
private static ZooKeeperServer zkServer;
private static HiveServer2 hiveServer;
private static HiveTestService hiveTestService;
public static HiveTestService hiveTestService;
private static ZookeeperTestService zkService;
private static Configuration configuration;
public static HiveSyncConfig hiveSyncConfig;
@@ -94,11 +93,7 @@ public class HiveTestUtil {
private static Set<String> createdTablesSet = new HashSet<>();
public static void setUp() throws IOException, InterruptedException {
if (dfsCluster == null) {
HdfsTestService service = new HdfsTestService();
dfsCluster = service.start(true);
configuration = service.getHadoopConf();
}
configuration = new Configuration();
if (zkServer == null) {
zkService = new ZookeeperTestService(configuration);
zkServer = zkService.start();
@@ -110,12 +105,12 @@ public class HiveTestUtil {
fileSystem = FileSystem.get(configuration);
hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
hiveSyncConfig.jdbcUrl = hiveTestService.getJdbcHive2Url();
hiveSyncConfig.hiveUser = "";
hiveSyncConfig.hivePass = "";
hiveSyncConfig.databaseName = "testdb";
hiveSyncConfig.tableName = "test1";
hiveSyncConfig.basePath = "/tmp/hdfs/TestHiveSyncTool/";
hiveSyncConfig.basePath = Files.createTempDirectory("hivesynctest" + Instant.now().toEpochMilli()).toUri().toString();
hiveSyncConfig.assumeDatePartitioning = true;
hiveSyncConfig.usePreApacheInputFormat = false;
hiveSyncConfig.partitionFields = Collections.singletonList("datestr");
@@ -146,18 +141,6 @@ public class HiveTestUtil {
return hiveServer.getHiveConf();
}
public static HiveServer2 getHiveServer() {
return hiveServer;
}
public static ZooKeeperServer getZkServer() {
return zkServer;
}
public static ZookeeperTestService getZkService() {
return zkService;
}
public static void shutdown() {
if (hiveServer != null) {
hiveServer.stop();
@@ -165,9 +148,6 @@ public class HiveTestUtil {
if (hiveTestService != null) {
hiveTestService.stop();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
if (zkServer != null) {
zkServer.shutdown();
}