diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java index 60336c53e..8a80685ed 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/ZookeeperBasedLockProvider.java @@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP; import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP; import static org.apache.hudi.common.config.LockConfiguration.ZK_BASE_PATH_PROP; import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP; @@ -65,7 +66,7 @@ public class ZookeeperBasedLockProvider implements LockProvider { +public class HiveMetastoreBasedLockProvider implements LockProvider { - private static final Logger LOG = LogManager.getLogger(HiveMetastoreLockProvider.class); + private static final Logger LOG = LogManager.getLogger(HiveMetastoreBasedLockProvider.class); private final String databaseName; private final String tableName; + private final String hiveMetastoreUris; private IMetaStoreClient hiveClient; private volatile LockResponse lock = null; protected LockConfiguration lockConfiguration; ExecutorService executor = Executors.newSingleThreadExecutor(); - public HiveMetastoreLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) { + public HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) { this(lockConfiguration); try { HiveConf hiveConf = new HiveConf(); @@ -91,16 +93,17 @@ public class HiveMetastoreLockProvider implements LockProvider { } } - public HiveMetastoreLockProvider(final LockConfiguration lockConfiguration, final IMetaStoreClient metaStoreClient) { + public HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration, final IMetaStoreClient metaStoreClient) { this(lockConfiguration); this.hiveClient = metaStoreClient; } - HiveMetastoreLockProvider(final LockConfiguration lockConfiguration) { + HiveMetastoreBasedLockProvider(final LockConfiguration lockConfiguration) { checkRequiredProps(lockConfiguration); this.lockConfiguration = lockConfiguration; this.databaseName = this.lockConfiguration.getConfig().getString(HIVE_DATABASE_NAME_PROP); this.tableName = this.lockConfiguration.getConfig().getString(HIVE_TABLE_NAME_PROP); + this.hiveMetastoreUris = this.lockConfiguration.getConfig().getOrDefault(HIVE_METASTORE_URI_PROP, "").toString(); } @Override @@ -206,6 +209,9 @@ public class HiveMetastoreLockProvider implements LockProvider { } private void setHiveLockConfs(HiveConf hiveConf) { + if (!StringUtils.isNullOrEmpty(this.hiveMetastoreUris)) { + hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, this.hiveMetastoreUris); + } hiveConf.set("hive.support.concurrency", "true"); hiveConf.set("hive.lock.manager", "org.apache.hadoop.hive.ql.lockmgr.zookeeper.ZooKeeperHiveLockManager"); hiveConf.set("hive.lock.numretries", lockConfiguration.getConfig().getString(LOCK_ACQUIRE_NUM_RETRIES_PROP)); diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveMetastoreLockProvider.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveMetastoreBasedLockProvider.java similarity index 92% rename from hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveMetastoreLockProvider.java rename to hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveMetastoreBasedLockProvider.java index 51f7814c3..4b1c7f0d7 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveMetastoreLockProvider.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveMetastoreBasedLockProvider.java @@ -54,7 +54,7 @@ import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT * /metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java#L2892 * Unless this is set, we cannot use HiveMetastore server in tests for locking use-cases. */ -public class TestHiveMetastoreLockProvider { +public class TestHiveMetastoreBasedLockProvider { private static Connection connection; private static LockComponent lockComponent = new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, "testdb"); @@ -86,7 +86,7 @@ public class TestHiveMetastoreLockProvider { @Test public void testAcquireLock() throws Exception { - HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf()); + HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent)); @@ -106,7 +106,7 @@ public class TestHiveMetastoreLockProvider { @Test public void testUnlock() throws Exception { - HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf()); + HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent)); @@ -119,7 +119,7 @@ public class TestHiveMetastoreLockProvider { @Test public void testReentrantLock() throws Exception { - HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf()); + HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); Assertions.assertTrue(lockProvider.acquireLock(lockConfiguration.getConfig() .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP), TimeUnit.MILLISECONDS, lockComponent)); @@ -135,7 +135,7 @@ public class TestHiveMetastoreLockProvider { @Test public void testUnlockWithoutLock() { - HiveMetastoreLockProvider lockProvider = new HiveMetastoreLockProvider(lockConfiguration, HiveTestUtil.getHiveConf()); + HiveMetastoreBasedLockProvider lockProvider = new HiveMetastoreBasedLockProvider(lockConfiguration, HiveTestUtil.getHiveConf()); lockComponent.setOperationType(DataOperationType.NO_TXN); lockProvider.unlock(); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 29dd3ae4d..359d7c077 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -297,7 +297,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { props.setProperty("hoodie.writer.lock.zookeeper.port", "2828"); props.setProperty("hoodie.writer.lock.wait_time_ms", "1200000"); props.setProperty("hoodie.writer.lock.num_retries", "10"); - props.setProperty("hoodie.writer.lock.lock_key", "test_table"); + props.setProperty("hoodie.writer.lock.zookeeper.lock_key", "test_table"); props.setProperty("hoodie.writer.lock.zookeeper.zk_base_path", "/test"); UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);