[HUDI-2964] Fixing aws lock configs to inherit from HoodieConfig (#4258)
This commit is contained in:
committed by
GitHub
parent
082faa3851
commit
7c3f0777aa
@@ -42,7 +42,7 @@ import org.apache.hudi.common.lock.LockProvider;
|
|||||||
import org.apache.hudi.common.lock.LockState;
|
import org.apache.hudi.common.lock.LockState;
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.common.util.ValidationUtils;
|
import org.apache.hudi.common.util.ValidationUtils;
|
||||||
import org.apache.hudi.config.AWSLockConfiguration;
|
import org.apache.hudi.config.DynamoDbBasedLockConfig;
|
||||||
import org.apache.hudi.exception.HoodieLockException;
|
import org.apache.hudi.exception.HoodieLockException;
|
||||||
|
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
@@ -80,8 +80,8 @@ public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
|
|||||||
public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, AmazonDynamoDB dynamoDB) {
|
public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, AmazonDynamoDB dynamoDB) {
|
||||||
checkRequiredProps(lockConfiguration);
|
checkRequiredProps(lockConfiguration);
|
||||||
this.lockConfiguration = lockConfiguration;
|
this.lockConfiguration = lockConfiguration;
|
||||||
this.tableName = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key());
|
this.tableName = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key());
|
||||||
this.dynamoDBPartitionKey = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key());
|
this.dynamoDBPartitionKey = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key());
|
||||||
long leaseDuration = Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
|
long leaseDuration = Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
|
||||||
if (dynamoDB == null) {
|
if (dynamoDB == null) {
|
||||||
dynamoDB = getDynamoDBClient();
|
dynamoDB = getDynamoDBClient();
|
||||||
@@ -155,7 +155,7 @@ public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private AmazonDynamoDB getDynamoDBClient() {
|
private AmazonDynamoDB getDynamoDBClient() {
|
||||||
String region = this.lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key());
|
String region = this.lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key());
|
||||||
String endpointURL = RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
|
String endpointURL = RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
|
||||||
AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
|
AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
|
||||||
new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
|
new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
|
||||||
@@ -166,7 +166,7 @@ public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName) {
|
private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName) {
|
||||||
String billingMode = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key());
|
String billingMode = lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key());
|
||||||
KeySchemaElement partitionKeyElement = new KeySchemaElement();
|
KeySchemaElement partitionKeyElement = new KeySchemaElement();
|
||||||
partitionKeyElement.setAttributeName(DYNAMODB_ATTRIBUTE_NAME);
|
partitionKeyElement.setAttributeName(DYNAMODB_ATTRIBUTE_NAME);
|
||||||
partitionKeyElement.setKeyType(KeyType.HASH);
|
partitionKeyElement.setKeyType(KeyType.HASH);
|
||||||
@@ -182,14 +182,14 @@ public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
|
|||||||
createTableRequest.setBillingMode(billingMode);
|
createTableRequest.setBillingMode(billingMode);
|
||||||
if (billingMode.equals(BillingMode.PROVISIONED.name())) {
|
if (billingMode.equals(BillingMode.PROVISIONED.name())) {
|
||||||
createTableRequest.setProvisionedThroughput(new ProvisionedThroughput()
|
createTableRequest.setProvisionedThroughput(new ProvisionedThroughput()
|
||||||
.withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key())))
|
.withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key())))
|
||||||
.withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key()))));
|
.withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key()))));
|
||||||
}
|
}
|
||||||
dynamoDB.createTable(createTableRequest);
|
dynamoDB.createTable(createTableRequest);
|
||||||
|
|
||||||
LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to be active");
|
LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to be active");
|
||||||
try {
|
try {
|
||||||
TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000);
|
TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000);
|
||||||
} catch (TableUtils.TableNeverTransitionedToStateException e) {
|
} catch (TableUtils.TableNeverTransitionedToStateException e) {
|
||||||
throw new HoodieLockException("Created dynamoDB table never transits to active", e);
|
throw new HoodieLockException("Created dynamoDB table never transits to active", e);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
@@ -199,14 +199,14 @@ public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void checkRequiredProps(final LockConfiguration config) {
|
private void checkRequiredProps(final LockConfiguration config) {
|
||||||
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key()) != null);
|
ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key()) != null);
|
||||||
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key()) != null);
|
ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key()) != null);
|
||||||
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key()) != null);
|
ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key()) != null);
|
||||||
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key()) != null);
|
ValidationUtils.checkArgument(config.getConfig().getString(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key()) != null);
|
||||||
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
|
config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
|
||||||
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "20");
|
config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(), "20");
|
||||||
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "10");
|
config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "10");
|
||||||
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(), "600000");
|
config.getConfig().putIfAbsent(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(), "600000");
|
||||||
}
|
}
|
||||||
|
|
||||||
private String generateLogSuffixString() {
|
private String generateLogSuffixString() {
|
||||||
|
|||||||
@@ -18,7 +18,10 @@
|
|||||||
|
|
||||||
package org.apache.hudi.config;
|
package org.apache.hudi.config;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.config.ConfigClassProperty;
|
||||||
|
import org.apache.hudi.common.config.ConfigGroups;
|
||||||
import org.apache.hudi.common.config.ConfigProperty;
|
import org.apache.hudi.common.config.ConfigProperty;
|
||||||
|
import org.apache.hudi.common.config.HoodieConfig;
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig;
|
import org.apache.hudi.common.table.HoodieTableConfig;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
|
|
||||||
@@ -27,7 +30,15 @@ import com.amazonaws.services.dynamodbv2.model.BillingMode;
|
|||||||
|
|
||||||
import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
|
import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
|
||||||
|
|
||||||
public class AWSLockConfiguration {
|
/**
|
||||||
|
* Hoodie Configs for Locks.
|
||||||
|
*/
|
||||||
|
@ConfigClassProperty(name = "DynamoDB based Locks Configurations",
|
||||||
|
groupName = ConfigGroups.Names.WRITE_CLIENT,
|
||||||
|
description = "Configs that control DynamoDB based locking mechanisms required for concurrency control "
|
||||||
|
+ " between writers to a Hudi table. Concurrency between Hudi's own table services "
|
||||||
|
+ " are auto managed internally.")
|
||||||
|
public class DynamoDbBasedLockConfig extends HoodieConfig {
|
||||||
|
|
||||||
// configs for DynamoDb based locks
|
// configs for DynamoDb based locks
|
||||||
public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "dynamodb.";
|
public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "dynamodb.";
|
||||||
@@ -29,12 +29,12 @@ import java.io.IOException;
|
|||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import javax.annotation.concurrent.Immutable;
|
import javax.annotation.concurrent.Immutable;
|
||||||
|
|
||||||
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE;
|
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE;
|
||||||
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY;
|
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY;
|
||||||
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY;
|
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY;
|
||||||
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_REGION;
|
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION;
|
||||||
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME;
|
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME;
|
||||||
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY;
|
import static org.apache.hudi.config.DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configurations used by the AWS credentials and AWS DynamoDB based lock.
|
* Configurations used by the AWS credentials and AWS DynamoDB based lock.
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
|
|||||||
import com.amazonaws.services.dynamodbv2.model.BillingMode;
|
import com.amazonaws.services.dynamodbv2.model.BillingMode;
|
||||||
import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
|
import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
|
||||||
import org.apache.hudi.common.config.LockConfiguration;
|
import org.apache.hudi.common.config.LockConfiguration;
|
||||||
import org.apache.hudi.config.AWSLockConfiguration;
|
import org.apache.hudi.config.DynamoDbBasedLockConfig;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Assertions;
|
import org.junit.jupiter.api.Assertions;
|
||||||
import org.junit.jupiter.api.BeforeAll;
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
@@ -54,20 +54,20 @@ public class ITTestDynamoDBBasedLockProvider {
|
|||||||
@BeforeAll
|
@BeforeAll
|
||||||
public static void setup() throws InterruptedException {
|
public static void setup() throws InterruptedException {
|
||||||
Properties properties = new Properties();
|
Properties properties = new Properties();
|
||||||
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
|
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
|
||||||
// properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX);
|
// properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX);
|
||||||
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key(), "testKey");
|
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key(), "testKey");
|
||||||
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key(), REGION);
|
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key(), REGION);
|
||||||
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
|
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
|
||||||
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "0");
|
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_READ_CAPACITY.key(), "0");
|
||||||
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "0");
|
properties.setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "0");
|
||||||
lockConfiguration = new LockConfiguration(properties);
|
lockConfiguration = new LockConfiguration(properties);
|
||||||
dynamoDb = getDynamoClientWithLocalEndpoint();
|
dynamoDb = getDynamoClientWithLocalEndpoint();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAcquireLock() {
|
public void testAcquireLock() {
|
||||||
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
|
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
|
||||||
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
|
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
|
||||||
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
||||||
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
||||||
@@ -76,7 +76,7 @@ public class ITTestDynamoDBBasedLockProvider {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnlock() {
|
public void testUnlock() {
|
||||||
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
|
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
|
||||||
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
|
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
|
||||||
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
||||||
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
||||||
@@ -87,7 +87,7 @@ public class ITTestDynamoDBBasedLockProvider {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReentrantLock() {
|
public void testReentrantLock() {
|
||||||
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
|
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
|
||||||
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
|
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
|
||||||
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
|
||||||
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
|
||||||
@@ -98,7 +98,7 @@ public class ITTestDynamoDBBasedLockProvider {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUnlockWithoutLock() {
|
public void testUnlockWithoutLock() {
|
||||||
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
|
lockConfiguration.getConfig().setProperty(DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
|
||||||
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
|
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
|
||||||
dynamoDbBasedLockProvider.unlock();
|
dynamoDbBasedLockProvider.unlock();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user