[HUDI-1728] Fix MethodNotFound for HiveMetastore Locks (#2731)
This commit is contained in:
@@ -639,7 +639,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
*/
|
*/
|
||||||
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
|
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
|
||||||
if (scheduleInline) {
|
if (scheduleInline) {
|
||||||
scheduleCleaningAtInstant(cleanInstantTime, Option.empty());
|
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
|
||||||
}
|
}
|
||||||
LOG.info("Cleaner started");
|
LOG.info("Cleaner started");
|
||||||
final Timer.Context timerContext = metrics.getCleanCtx();
|
final Timer.Context timerContext = metrics.getCleanCtx();
|
||||||
|
|||||||
@@ -93,7 +93,7 @@ public class LockManager implements Serializable, AutoCloseable {
|
|||||||
public synchronized LockProvider getLockProvider() {
|
public synchronized LockProvider getLockProvider() {
|
||||||
// Perform lazy initialization of lock provider only if needed
|
// Perform lazy initialization of lock provider only if needed
|
||||||
if (lockProvider == null) {
|
if (lockProvider == null) {
|
||||||
LOG.info("Lock Provider " + writeConfig.getLockProviderClass());
|
LOG.info("LockProvider " + writeConfig.getLockProviderClass());
|
||||||
lockProvider = (LockProvider) ReflectionUtils.loadClass(writeConfig.getLockProviderClass(),
|
lockProvider = (LockProvider) ReflectionUtils.loadClass(writeConfig.getLockProviderClass(),
|
||||||
lockConfiguration, hadoopConf.get());
|
lockConfiguration, hadoopConf.get());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -108,7 +108,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
|
|||||||
public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = "hoodie.compaction.reverse.log.read";
|
public static final String COMPACTION_REVERSE_LOG_READ_ENABLED_PROP = "hoodie.compaction.reverse.log.read";
|
||||||
public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = "false";
|
public static final String DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED = "false";
|
||||||
private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
|
private static final String DEFAULT_CLEANER_POLICY = HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name();
|
||||||
public static final String FAILED_WRITES_CLEANER_POLICY_PROP = "hoodie.failed.writes.cleaner.policy";
|
public static final String FAILED_WRITES_CLEANER_POLICY_PROP = "hoodie.cleaner.policy.failed.writes";
|
||||||
private static final String DEFAULT_FAILED_WRITES_CLEANER_POLICY =
|
private static final String DEFAULT_FAILED_WRITES_CLEANER_POLICY =
|
||||||
HoodieFailedWritesCleaningPolicy.EAGER.name();
|
HoodieFailedWritesCleaningPolicy.EAGER.name();
|
||||||
private static final String DEFAULT_AUTO_CLEAN = "true";
|
private static final String DEFAULT_AUTO_CLEAN = "true";
|
||||||
|
|||||||
@@ -1424,7 +1424,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
if (props.getProperty(WRITE_CONCURRENCY_MODE_PROP)
|
if (props.getProperty(WRITE_CONCURRENCY_MODE_PROP)
|
||||||
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())) {
|
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name())) {
|
||||||
ValidationUtils.checkArgument(props.getProperty(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP)
|
ValidationUtils.checkArgument(props.getProperty(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP)
|
||||||
!= HoodieFailedWritesCleaningPolicy.EAGER.name());
|
!= HoodieFailedWritesCleaningPolicy.EAGER.name(), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -51,7 +51,6 @@ import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI
|
|||||||
import static org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
|
import static org.apache.hudi.common.config.LockConfiguration.HIVE_TABLE_NAME_PROP;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
|
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
|
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECTION_TIMEOUT_MS_PROP;
|
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
|
import static org.apache.hudi.common.config.LockConfiguration.ZK_CONNECT_URL_PROP;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP;
|
import static org.apache.hudi.common.config.LockConfiguration.ZK_PORT_PROP;
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
|
import static org.apache.hudi.common.config.LockConfiguration.ZK_SESSION_TIMEOUT_MS_PROP;
|
||||||
@@ -177,7 +176,8 @@ public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse
|
|||||||
throws InterruptedException, ExecutionException, TimeoutException, TException {
|
throws InterruptedException, ExecutionException, TimeoutException, TException {
|
||||||
LockRequest lockRequest = null;
|
LockRequest lockRequest = null;
|
||||||
try {
|
try {
|
||||||
final LockRequestBuilder builder = new LockRequestBuilder("hudi-lock");
|
// TODO : FIX:Using the parameterized constructor throws MethodNotFound
|
||||||
|
final LockRequestBuilder builder = new LockRequestBuilder();
|
||||||
lockRequest = builder.addLockComponent(lockComponent).setUser(System.getProperty("user.name")).build();
|
lockRequest = builder.addLockComponent(lockComponent).setUser(System.getProperty("user.name")).build();
|
||||||
lockRequest.setUserIsSet(true);
|
lockRequest.setUserIsSet(true);
|
||||||
final LockRequest lockRequestFinal = lockRequest;
|
final LockRequest lockRequestFinal = lockRequest;
|
||||||
@@ -203,9 +203,6 @@ public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse
|
|||||||
private void checkRequiredProps(final LockConfiguration lockConfiguration) {
|
private void checkRequiredProps(final LockConfiguration lockConfiguration) {
|
||||||
ValidationUtils.checkArgument(lockConfiguration.getConfig().getString(HIVE_DATABASE_NAME_PROP) != null);
|
ValidationUtils.checkArgument(lockConfiguration.getConfig().getString(HIVE_DATABASE_NAME_PROP) != null);
|
||||||
ValidationUtils.checkArgument(lockConfiguration.getConfig().getString(HIVE_TABLE_NAME_PROP) != null);
|
ValidationUtils.checkArgument(lockConfiguration.getConfig().getString(HIVE_TABLE_NAME_PROP) != null);
|
||||||
ValidationUtils.checkArgument(lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP) != null);
|
|
||||||
ValidationUtils.checkArgument(lockConfiguration.getConfig().getString(ZK_SESSION_TIMEOUT_MS_PROP) != null);
|
|
||||||
ValidationUtils.checkArgument(lockConfiguration.getConfig().getString(ZK_CONNECTION_TIMEOUT_MS_PROP) != null);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setHiveLockConfs(HiveConf hiveConf) {
|
private void setHiveLockConfs(HiveConf hiveConf) {
|
||||||
@@ -217,9 +214,18 @@ public class HiveMetastoreBasedLockProvider implements LockProvider<LockResponse
|
|||||||
hiveConf.set("hive.lock.numretries", lockConfiguration.getConfig().getString(LOCK_ACQUIRE_NUM_RETRIES_PROP));
|
hiveConf.set("hive.lock.numretries", lockConfiguration.getConfig().getString(LOCK_ACQUIRE_NUM_RETRIES_PROP));
|
||||||
hiveConf.set("hive.unlock.numretries", lockConfiguration.getConfig().getString(LOCK_ACQUIRE_NUM_RETRIES_PROP));
|
hiveConf.set("hive.unlock.numretries", lockConfiguration.getConfig().getString(LOCK_ACQUIRE_NUM_RETRIES_PROP));
|
||||||
hiveConf.set("hive.lock.sleep.between.retries", lockConfiguration.getConfig().getString(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP));
|
hiveConf.set("hive.lock.sleep.between.retries", lockConfiguration.getConfig().getString(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP));
|
||||||
hiveConf.set("hive.zookeeper.quorum", lockConfiguration.getConfig().getString(ZK_CONNECT_URL_PROP));
|
String zkConnectUrl = lockConfiguration.getConfig().getOrDefault(ZK_CONNECT_URL_PROP, "").toString();
|
||||||
hiveConf.set("hive.zookeeper.client.port", lockConfiguration.getConfig().getString(ZK_PORT_PROP));
|
if (zkConnectUrl.length() > 0) {
|
||||||
hiveConf.set("hive.zookeeper.session.timeout", lockConfiguration.getConfig().getString(ZK_SESSION_TIMEOUT_MS_PROP));
|
hiveConf.set("hive.zookeeper.quorum", zkConnectUrl);
|
||||||
|
}
|
||||||
|
String zkPort = lockConfiguration.getConfig().getOrDefault(ZK_PORT_PROP, "").toString();
|
||||||
|
if (zkPort.length() > 0) {
|
||||||
|
hiveConf.set("hive.zookeeper.client.port", zkPort);
|
||||||
|
}
|
||||||
|
String zkSessionTimeout = lockConfiguration.getConfig().getOrDefault(ZK_SESSION_TIMEOUT_MS_PROP, "").toString();
|
||||||
|
if (zkSessionTimeout.length() > 0) {
|
||||||
|
hiveConf.set("hive.zookeeper.session.timeout", zkSessionTimeout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String generateLogSuffixString() {
|
private String generateLogSuffixString() {
|
||||||
|
|||||||
@@ -289,7 +289,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
|
|
||||||
props.setProperty("include", "base.properties");
|
props.setProperty("include", "base.properties");
|
||||||
props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
|
props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
|
||||||
props.setProperty("hoodie.failed.writes.cleaner.policy", "LAZY");
|
props.setProperty("hoodie.cleaner.policy.failed.writes", "LAZY");
|
||||||
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
|
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider");
|
||||||
props.setProperty("hoodie.write.lock.hivemetastore.database", "testdb1");
|
props.setProperty("hoodie.write.lock.hivemetastore.database", "testdb1");
|
||||||
props.setProperty("hoodie.write.lock.hivemetastore.table", "table1");
|
props.setProperty("hoodie.write.lock.hivemetastore.table", "table1");
|
||||||
@@ -298,7 +298,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
|
|||||||
props.setProperty("hoodie.write.lock.wait_time_ms", "1200000");
|
props.setProperty("hoodie.write.lock.wait_time_ms", "1200000");
|
||||||
props.setProperty("hoodie.write.lock.num_retries", "10");
|
props.setProperty("hoodie.write.lock.num_retries", "10");
|
||||||
props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table");
|
props.setProperty("hoodie.write.lock.zookeeper.lock_key", "test_table");
|
||||||
props.setProperty("hoodie.write.lock.zookeeper.zk_base_path", "/test");
|
props.setProperty("hoodie.write.lock.zookeeper.base_path", "/test");
|
||||||
|
|
||||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
|
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + propsFileName);
|
||||||
return props;
|
return props;
|
||||||
|
|||||||
Reference in New Issue
Block a user