1
0

[HUDI-1709] Improving config names and adding hive metastore uri config (#2699)

This commit is contained in:
n3nash
2021-03-22 01:22:06 -07:00
committed by GitHub
parent ce3e8ec870
commit d7b18783bd
7 changed files with 42 additions and 17 deletions

View File

@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP;
import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
@@ -65,7 +66,7 @@ public class ZookeeperBasedLockProvider implements LockProvider<InterProcessMute
this.curatorFrameworkClient = CuratorFrameworkFactory.builder()
.connectString(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP))
.retryPolicy(new BoundedExponentialBackoffRetry(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
5000, lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP), lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP)))
.sessionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_SESSION_TIMEOUT_MS_PROP, DEFAULT_ZK_SESSION_TIMEOUT_MS))
.connectionTimeoutMs(lockConfiguration.getConfig().getInteger(ZK_CONNECTION_TIMEOUT_MS_PROP, DEFAULT_ZK_CONNECTION_TIMEOUT_MS))
.build();

View File

@@ -31,15 +31,18 @@ import java.util.Properties;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ACQUIRE_LOCK_WAIT_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_MAX_RETRY_WAIT_TIME_IN_MILLIS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_NUM_RETRIES;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
@@ -104,6 +107,11 @@ public class HoodieLockConfig extends DefaultHoodieConfig {
return this;
}
public HoodieLockConfig.Builder withHiveMetastoreURIs(String hiveMetastoreURIs) {
props.setProperty(HIVE_METASTORE_URI_PROP, hiveMetastoreURIs);
return this;
}
public HoodieLockConfig.Builder withZkQuorum(String zkQuorum) {
props.setProperty(ZK_CONNECT_URL_PROP, zkQuorum);
return this;
@@ -144,6 +152,11 @@ public class HoodieLockConfig extends DefaultHoodieConfig {
return this;
}
public HoodieLockConfig.Builder withRetryMaxWaitTimeInMillis(Long retryMaxWaitTimeInMillis) {
props.setProperty(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP, String.valueOf(retryMaxWaitTimeInMillis));
return this;
}
public HoodieLockConfig.Builder withClientNumRetries(int clientNumRetries) {
props.setProperty(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP, String.valueOf(clientNumRetries));
return this;
@@ -174,6 +187,8 @@ public class HoodieLockConfig extends DefaultHoodieConfig {
LOCK_ACQUIRE_NUM_RETRIES_PROP, DEFAULT_LOCK_ACQUIRE_NUM_RETRIES);
setDefaultOnCondition(props, !props.containsKey(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP),
LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP, DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS);
setDefaultOnCondition(props, !props.containsKey(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP),
LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP, DEFAULT_LOCK_ACQUIRE_MAX_RETRY_WAIT_TIME_IN_MILLIS);
setDefaultOnCondition(props, !props.containsKey(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP),
LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP, DEFAULT_LOCK_ACQUIRE_CLIENT_NUM_RETRIES);
setDefaultOnCondition(props, !props.containsKey(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP),

View File

@@ -30,6 +30,8 @@ public class LockConfiguration implements Serializable {
public static final String LOCK_PREFIX = "hoodie.writer.lock.";
public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX + "wait_time_ms_between_retry";
public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(5000L);
public static final String LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX + "max_wait_time_ms_between_retry";
public static final String DEFAULT_LOCK_ACQUIRE_MAX_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(5000L);
public static final String LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP = LOCK_PREFIX + "client.wait_time_ms_between_retry";
public static final String DEFAULT_LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(10000L);
public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP = LOCK_PREFIX + "num_retries";
@@ -45,16 +47,17 @@ public class LockConfiguration implements Serializable {
public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "hivemetastore.";
public static final String HIVE_DATABASE_NAME_PROP = HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "database";
public static final String HIVE_TABLE_NAME_PROP = HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "table";
public static final String HIVE_METASTORE_URI_PROP = HIVE_METASTORE_LOCK_PROPERTY_PREFIX + "uris";
// Zookeeper configs for zk based locks
public static final String ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "zookeeper.";
public static final String ZK_BASE_PATH_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_base_path";
public static final String ZK_SESSION_TIMEOUT_MS_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_session_timeout_ms";
public static final String ZK_BASE_PATH_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "base_path";
public static final String ZK_SESSION_TIMEOUT_MS_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "session_timeout_ms";
public static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 60 * 1000;
public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "zk_connection_timeout_ms";
public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "connection_timeout_ms";
public static final int DEFAULT_ZK_CONNECTION_TIMEOUT_MS = 15 * 1000;
public static final String ZK_CONNECT_URL_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "url";
public static final String ZK_PORT_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "port";
public static final String ZK_LOCK_KEY_PROP = LOCK_PREFIX + "lock_key";
public static final String ZK_LOCK_KEY_PROP = ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX + "lock_key";
private final TypedProperties props;

View File

@@ -178,7 +178,7 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase {
props.setProperty("hoodie.writer.lock.zookeeper.port", "2828");
props.setProperty("hoodie.writer.lock.wait_time_ms", "1200000");
props.setProperty("hoodie.writer.lock.num_retries", "10");
props.setProperty("hoodie.writer.lock.lock_key", "test_table");
props.setProperty("hoodie.writer.lock.zookeeper.lock_key", "test_table");
props.setProperty("hoodie.writer.lock.zookeeper.zk_base_path", "/test");
return props;
}

View File

@@ -47,6 +47,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP;
import static org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
@@ -68,18 +69,19 @@ import static org.apache.hudi.common.lock.LockState.RELEASING;
* using hive metastore APIs. Users need to have a HiveMetastore & Zookeeper cluster deployed to be able to use this lock.
*
*/
public class HiveMetastoreLockProvider implements LockProvider<LockResponse> {
public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse> {
private static final Logger LOG = LogManager.getLogger(HiveMetastoreLockProvider.class);
private static final Logger LOG = LogManager.getLogger(HiveMetastoreBasedLockProvider.class);
private final String databaseName;
private final String tableName;
private final String hiveMetastoreUris;
private IMetaStoreClient hiveClient;
private volatile LockResponse lock = null;
protected LockConfiguration lockConfiguration;
ExecutorService executor = Executors.newSingleThreadExecutor();
public HiveMetastoreLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
public HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
this(lockConfiguration);
try {
HiveConf hiveConf = new HiveConf();
@@ -91,16 +93,17 @@ public class HiveMetastoreLockProvider implements LockProvider<LockResponse> {
}
}
public HiveMetastoreLockProvider(final LockConfiguration lockConfiguration, final IMetaStoreClient metaStoreClient) {
public HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration, final IMetaStoreClient metaStoreClient) {
this(lockConfiguration);
this.hiveClient = metaStoreClient;
}
HiveMetastoreLockProvider(final LockConfiguration lockConfiguration) {
HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration) {
checkRequiredProps(lockConfiguration);
this.lockConfiguration = lockConfiguration;
this.databaseName = this.lockConfiguration.getConfig().getString(HIVE_DATABASE_NAME_PROP);
this.tableName = this.lockConfiguration.getConfig().getString(HIVE_TABLE_NAME_PROP);
this.hiveMetastoreUris = this.lockConfiguration.getConfig().getOrDefault(HIVE_METASTORE_URI_PROP, "").toString();
}
@Override
@@ -206,6 +209,9 @@ public class HiveMetastoreLockProvider implements LockProvider<LockResponse> {
}
private void setHiveLockConfs(HiveConf hiveConf) {
if (!StringUtils.isNullOrEmpty(this.hiveMetastoreUris)) {
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, this.hiveMetastoreUris);
}
hiveConf.set("hive.support.concurrency", "true");
hiveConf.set("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager");
hiveConf.set("hive.lock.numretries", lockConfiguration.getConfig().getString(LOCK_ACQUIRE_NUM_RETRIES_PROP));

View File

@@ -54,7 +54,7 @@ import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT
* /metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java#L2892
* Unless this is set, we cannot use HiveMetastore server in tests for locking use-cases.
*/
public class TestHiveMetastoreLockProvider {
public class TestHiveMetastoreBasedLockProvider {
private static Connection connection;
private static LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, "testdb");
@@ -86,7 +86,7 @@ public class TestHiveMetastoreLockProvider {
@Test
public void testAcquireLock() throws Exception {
HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent));
@@ -106,7 +106,7 @@ public class TestHiveMetastoreLockProvider {
@Test
public void testUnlock() throws Exception {
HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent));
@@ -119,7 +119,7 @@ public class TestHiveMetastoreLockProvider {
@Test
public void testReentrantLock() throws Exception {
HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent));
@@ -135,7 +135,7 @@ public class TestHiveMetastoreLockProvider {
@Test
public void testUnlockWithoutLock() {
HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf());
lockComponent.setOperationType(DataOperationType.NO_TXN);
lockProvider.unlock();
}

View File

@@ -297,7 +297,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
props.setProperty("hoodie.writer.lock.zookeeper.port", "2828");
props.setProperty("hoodie.writer.lock.wait_time_ms", "1200000");
props.setProperty("hoodie.writer.lock.num_retries", "10");
props.setProperty("hoodie.writer.lock.lock_key", "test_table");
props.setProperty("hoodie.writer.lock.zookeeper.lock_key", "test_table");
props.setProperty("hoodie.writer.lock.zookeeper.zk_base_path", "/test");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);