1
0

[HUDI-2314] Add support for DynamoDb based lock provider (#3486)

- Co-authored-by: Wenning Ding <wenningd@amazon.com>
- Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
wenningd
2021-11-17 09:09:31 -08:00
committed by GitHub
parent 826414cff5
commit 1ee12cfa6f
16 changed files with 982 additions and 3 deletions

View File

@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.aws.credentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* Factory class for Hoodie AWSCredentialsProvider.
*/
public class HoodieAWSCredentialsProviderFactory {
public static AWSCredentialsProvider getAwsCredentialsProvider(Properties props) {
return getAwsCredentialsProviderChain(props);
}
private static AWSCredentialsProvider getAwsCredentialsProviderChain(Properties props) {
List<AWSCredentialsProvider> providers = new ArrayList<>();
providers.add(new HoodieConfigAWSCredentialsProvider(props));
providers.add(new DefaultAWSCredentialsProviderChain());
AWSCredentialsProviderChain providerChain = new AWSCredentialsProviderChain(providers);
providerChain.setReuseLastProvider(true);
return providerChain;
}
}

View File

@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.aws.credentials;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.BasicSessionCredentials;
import org.apache.hudi.config.HoodieAWSConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Properties;
/**
* Credentials provider which fetches AWS access key from Hoodie config.
*/
public class HoodieConfigAWSCredentialsProvider implements AWSCredentialsProvider {
private static final Logger LOG = LogManager.getLogger(HoodieConfigAWSCredentialsProvider.class);
private AWSCredentials awsCredentials;
public HoodieConfigAWSCredentialsProvider(Properties props) {
String accessKey = props.getProperty(HoodieAWSConfig.AWS_ACCESS_KEY.key());
String secretKey = props.getProperty(HoodieAWSConfig.AWS_SECRET_KEY.key());
String sessionToken = props.getProperty(HoodieAWSConfig.AWS_SESSION_TOKEN.key());
if (StringUtils.isNullOrEmpty(accessKey) || StringUtils.isNullOrEmpty(secretKey)) {
LOG.debug("AWS access key or secret key not found in the Hudi configuration. "
+ "Use default AWS credentials");
} else {
this.awsCredentials = createCredentials(accessKey, secretKey, sessionToken);
}
}
private static AWSCredentials createCredentials(String accessKey, String secretKey,
String sessionToken) {
return (sessionToken == null)
? new BasicAWSCredentials(accessKey, secretKey)
: new BasicSessionCredentials(accessKey, secretKey, sessionToken);
}
@Override
public AWSCredentials getCredentials() {
return this.awsCredentials;
}
@Override
public void refresh() {
}
}

View File

@@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.aws.transaction.lock;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.dynamodbv2.AcquireLockOptions;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBLockClientOptions;
import com.amazonaws.services.dynamodbv2.LockItem;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.LockNotGrantedException;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType;
import com.amazonaws.services.dynamodbv2.util.TableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.lock.LockProvider;
import org.apache.hudi.common.lock.LockState;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.AWSLockConfiguration;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
/**
* A DynamoDB based lock. This {@link LockProvider} implementation allows to lock table operations
* using DynamoDB. Users need to have access to AWS DynamoDB to be able to use this lock.
*/
@NotThreadSafe
public class DynamoDBBasedLockProvider implements LockProvider<LockItem> {
private static final Logger LOG = LogManager.getLogger(DynamoDBBasedLockProvider.class);
private static final String DYNAMODB_ATTRIBUTE_NAME = "key";
private final AmazonDynamoDBLockClient client;
private final String tableName;
private final String dynamoDBPartitionKey;
protected LockConfiguration lockConfiguration;
private volatile LockItem lock;
public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf) {
this(lockConfiguration, conf, null);
}
public DynamoDBBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration conf, AmazonDynamoDB dynamoDB) {
checkRequiredProps(lockConfiguration);
this.lockConfiguration = lockConfiguration;
this.tableName = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key());
this.dynamoDBPartitionKey = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key());
long leaseDuration = Long.parseLong(lockConfiguration.getConfig().getString(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY));
if (dynamoDB == null) {
dynamoDB = getDynamoDBClient();
}
// build the dynamoDb lock client
this.client = new AmazonDynamoDBLockClient(
AmazonDynamoDBLockClientOptions.builder(dynamoDB, tableName)
.withTimeUnit(TimeUnit.MILLISECONDS)
.withLeaseDuration(leaseDuration)
.withHeartbeatPeriod(leaseDuration / 3)
.withCreateHeartbeatBackgroundThread(true)
.build());
if (!this.client.lockTableExists()) {
createLockTableInDynamoDB(dynamoDB, tableName);
}
}
@Override
public boolean tryLock(long time, TimeUnit unit) {
LOG.info(generateLogStatement(LockState.ACQUIRING, generateLogSuffixString()));
try {
lock = client.acquireLock(AcquireLockOptions.builder(dynamoDBPartitionKey)
.withAdditionalTimeToWaitForLock(time)
.withTimeUnit(TimeUnit.MILLISECONDS)
.build());
LOG.info(generateLogStatement(LockState.ACQUIRED, generateLogSuffixString()));
} catch (InterruptedException e) {
throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_ACQUIRE, generateLogSuffixString()), e);
} catch (LockNotGrantedException e) {
return false;
}
return lock != null && !lock.isExpired();
}
@Override
public void unlock() {
try {
LOG.info(generateLogStatement(LockState.RELEASING, generateLogSuffixString()));
if (lock == null) {
return;
}
if (!client.releaseLock(lock)) {
LOG.warn("The lock has already been stolen");
}
lock = null;
LOG.info(generateLogStatement(LockState.RELEASED, generateLogSuffixString()));
} catch (Exception e) {
throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE, generateLogSuffixString()), e);
}
}
@Override
public void close() {
try {
if (lock != null) {
if (!client.releaseLock(lock)) {
LOG.warn("The lock has already been stolen");
}
lock = null;
}
this.client.close();
} catch (Exception e) {
LOG.error(generateLogStatement(LockState.FAILED_TO_RELEASE, generateLogSuffixString()));
}
}
@Override
public LockItem getLock() {
return lock;
}
private AmazonDynamoDB getDynamoDBClient() {
String region = this.lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key());
String endpointURL = RegionUtils.getRegion(region).getServiceEndpoint(AmazonDynamoDB.ENDPOINT_PREFIX);
AwsClientBuilder.EndpointConfiguration dynamodbEndpoint =
new AwsClientBuilder.EndpointConfiguration(endpointURL, region);
return AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(dynamodbEndpoint)
.withCredentials(HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(lockConfiguration.getConfig()))
.build();
}
private void createLockTableInDynamoDB(AmazonDynamoDB dynamoDB, String tableName) {
String billingMode = lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key());
KeySchemaElement partitionKeyElement = new KeySchemaElement();
partitionKeyElement.setAttributeName(DYNAMODB_ATTRIBUTE_NAME);
partitionKeyElement.setKeyType(KeyType.HASH);
List<KeySchemaElement> keySchema = new ArrayList<>();
keySchema.add(partitionKeyElement);
Collection<AttributeDefinition> attributeDefinitions = new ArrayList<>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName(DYNAMODB_ATTRIBUTE_NAME).withAttributeType(ScalarAttributeType.S));
CreateTableRequest createTableRequest = new CreateTableRequest(tableName, keySchema);
createTableRequest.setAttributeDefinitions(attributeDefinitions);
createTableRequest.setBillingMode(billingMode);
if (billingMode.equals(BillingMode.PROVISIONED.name())) {
createTableRequest.setProvisionedThroughput(new ProvisionedThroughput()
.withReadCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key())))
.withWriteCapacityUnits(Long.parseLong(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key()))));
}
dynamoDB.createTable(createTableRequest);
LOG.info("Creating dynamoDB table " + tableName + ", waiting for table to be active");
try {
TableUtils.waitUntilActive(dynamoDB, tableName, Integer.parseInt(lockConfiguration.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key())), 20 * 1000);
} catch (TableUtils.TableNeverTransitionedToStateException e) {
throw new HoodieLockException("Created dynamoDB table never transits to active", e);
} catch (InterruptedException e) {
throw new HoodieLockException("Thread interrupted while waiting for dynamoDB table to turn active", e);
}
LOG.info("Created dynamoDB table " + tableName);
}
private void checkRequiredProps(final LockConfiguration config) {
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key()) != null);
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key()) != null);
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key()) != null);
ValidationUtils.checkArgument(config.getConfig().getString(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key()) != null);
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "20");
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "10");
config.getConfig().putIfAbsent(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT.key(), "600000");
}
private String generateLogSuffixString() {
return StringUtils.join("DynamoDb table = ", tableName, ", partition key = ", dynamoDBPartitionKey);
}
protected String generateLogStatement(LockState state, String suffix) {
return StringUtils.join(state.name(), " lock at ", suffix);
}
}

View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.config;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.util.Option;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_PREFIX;
public class AWSLockConfiguration {
// configs for DynamoDb based locks
public static final String DYNAMODB_BASED_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "dynamodb.";
public static final ConfigProperty<String> DYNAMODB_LOCK_TABLE_NAME = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table")
.noDefaultValue()
.withDocumentation("For DynamoDB based lock provider, the name of the DynamoDB table acting as lock table");
public static final ConfigProperty<String> DYNAMODB_LOCK_PARTITION_KEY = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "partition_key")
.noDefaultValue()
.withInferFunction(cfg -> {
if (cfg.contains(HoodieWriteConfig.TBL_NAME)) {
return Option.of(cfg.getString(HoodieWriteConfig.TBL_NAME));
}
return Option.empty();
})
.withDocumentation("For DynamoDB based lock provider, the partition key for the DynamoDB lock table. "
+ "Each Hudi dataset should has it's unique key so concurrent writers could refer to the same partition key."
+ " By default we use the Hudi table name specified to be the partition key");
public static final ConfigProperty<String> DYNAMODB_LOCK_REGION = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "region")
.defaultValue("us-east-1")
.withInferFunction(cfg -> {
String regionFromEnv = System.getenv("AWS_REGION");
if (regionFromEnv != null) {
return Option.of(RegionUtils.getRegion(regionFromEnv).getName());
}
return Option.empty();
})
.withDocumentation("For DynamoDB based lock provider, the region used in endpoint for Amazon DynamoDB service."
+ " Would try to first get it from AWS_REGION environment variable. If not find, by default use us-east-1");
public static final ConfigProperty<String> DYNAMODB_LOCK_BILLING_MODE = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "billing_mode")
.defaultValue(BillingMode.PAY_PER_REQUEST.name())
.withDocumentation("For DynamoDB based lock provider, by default it is PAY_PER_REQUEST mode");
public static final ConfigProperty<String> DYNAMODB_LOCK_READ_CAPACITY = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "read_capacity")
.defaultValue("20")
.withDocumentation("For DynamoDB based lock provider, read capacity units when using PROVISIONED billing mode");
public static final ConfigProperty<String> DYNAMODB_LOCK_WRITE_CAPACITY = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "write_capacity")
.defaultValue("10")
.withDocumentation("For DynamoDB based lock provider, write capacity units when using PROVISIONED billing mode");
public static final ConfigProperty<String> DYNAMODB_LOCK_TABLE_CREATION_TIMEOUT = ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "table_creation_timeout")
.defaultValue(String.valueOf(10 * 60 * 1000))
.withDocumentation("For DynamoDB based lock provider, the maximum number of milliseconds to wait for creating DynamoDB table");
}

View File

@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.config;
import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import javax.annotation.concurrent.Immutable;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_REGION;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME;
import static org.apache.hudi.config.AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY;
/**
* Configurations used by the AWS credentials and AWS DynamoDB based lock.
*/
@Immutable
@ConfigClassProperty(name = "AWS credential Configs",
groupName = ConfigGroups.Names.AWS_DYNAMO_DB,
description = "Configurations used for AWS credentials to get AWS resources.")
public class HoodieAWSConfig extends HoodieConfig {
public static final ConfigProperty<String> AWS_ACCESS_KEY = ConfigProperty
.key("hoodie.aws.access.key")
.noDefaultValue()
.withDocumentation("AWS access key id");
public static final ConfigProperty<String> AWS_SECRET_KEY = ConfigProperty
.key("hoodie.aws.secret.key")
.noDefaultValue()
.withDocumentation("AWS secret key");
public static final ConfigProperty<String> AWS_SESSION_TOKEN = ConfigProperty
.key("hoodie.aws.session.token")
.noDefaultValue()
.withDocumentation("AWS session token");
private HoodieAWSConfig() {
super();
}
public static HoodieAWSConfig.Builder newBuilder() {
return new HoodieAWSConfig.Builder();
}
public String getAWSAccessKey() {
return getString(AWS_ACCESS_KEY);
}
public String getAWSSecretKey() {
return getString(AWS_SECRET_KEY);
}
public String getAWSSessionToken() {
return getString(AWS_SESSION_TOKEN);
}
public static class Builder {
private final HoodieAWSConfig awsConfig = new HoodieAWSConfig();
public HoodieAWSConfig.Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
this.awsConfig.getProps().load(reader);
return this;
}
}
public HoodieAWSConfig.Builder fromProperties(Properties props) {
this.awsConfig.getProps().putAll(props);
return this;
}
public HoodieAWSConfig.Builder withAccessKey(String accessKey) {
awsConfig.setValue(AWS_ACCESS_KEY, accessKey);
return this;
}
public HoodieAWSConfig.Builder withSecretKey(String secretKey) {
awsConfig.setValue(AWS_SECRET_KEY, secretKey);
return this;
}
public HoodieAWSConfig.Builder withSessionToken(String sessionToken) {
awsConfig.setValue(AWS_SESSION_TOKEN, sessionToken);
return this;
}
public Builder withDynamoDBTable(String dynamoDbTableName) {
awsConfig.setValue(DYNAMODB_LOCK_TABLE_NAME, dynamoDbTableName);
return this;
}
public Builder withDynamoDBPartitionKey(String partitionKey) {
awsConfig.setValue(DYNAMODB_LOCK_PARTITION_KEY, partitionKey);
return this;
}
public Builder withDynamoDBRegion(String region) {
awsConfig.setValue(DYNAMODB_LOCK_REGION, region);
return this;
}
public Builder withDynamoDBBillingMode(String mode) {
awsConfig.setValue(DYNAMODB_LOCK_BILLING_MODE, mode);
return this;
}
public Builder withDynamoDBReadCapacity(String capacity) {
awsConfig.setValue(DYNAMODB_LOCK_READ_CAPACITY, capacity);
return this;
}
public Builder withDynamoDBWriteCapacity(String capacity) {
awsConfig.setValue(DYNAMODB_LOCK_WRITE_CAPACITY, capacity);
return this;
}
public HoodieAWSConfig build() {
awsConfig.setDefaults(HoodieAWSConfig.class.getName());
return awsConfig;
}
}
}

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.aws;
import com.amazonaws.auth.BasicSessionCredentials;
import org.apache.hudi.config.HoodieAWSConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestHoodieAWSCredentialsProviderFactory {
@Test
public void testGetAWSCredentials() {
HoodieConfig cfg = new HoodieConfig();
cfg.setValue(HoodieAWSConfig.AWS_ACCESS_KEY, "random-access-key");
cfg.setValue(HoodieAWSConfig.AWS_SECRET_KEY, "random-secret-key");
cfg.setValue(HoodieAWSConfig.AWS_SESSION_TOKEN, "random-session-token");
BasicSessionCredentials credentials = (BasicSessionCredentials) org.apache.hudi.aws.credentials.HoodieAWSCredentialsProviderFactory.getAwsCredentialsProvider(cfg.getProps()).getCredentials();
assertEquals("random-access-key", credentials.getAWSAccessKeyId());
assertEquals("random-secret-key", credentials.getAWSSecretKey());
assertEquals("random-session-token", credentials.getSessionToken());
}
}

View File

@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.aws.transaction.integ;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.dynamodbv2.model.BillingMode;
import org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.config.AWSLockConfiguration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.UUID;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY;
/**
* Test for {@link DynamoDBBasedLockProvider}.
* Set it as integration test because it requires setting up docker environment.
*/
public class ITTestDynamoDBBasedLockProvider {
private static LockConfiguration lockConfiguration;
private static AmazonDynamoDB dynamoDb;
private static final String TABLE_NAME_PREFIX = "testDDBTable-";
private static final String REGION = "us-east-2";
@BeforeAll
public static void setup() throws InterruptedException {
Properties properties = new Properties();
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_BILLING_MODE.key(), BillingMode.PAY_PER_REQUEST.name());
// properties.setProperty(AWSLockConfig.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX);
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_PARTITION_KEY.key(), "testKey");
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_REGION.key(), REGION);
properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000");
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_READ_CAPACITY.key(), "0");
properties.setProperty(AWSLockConfiguration.DYNAMODB_LOCK_WRITE_CAPACITY.key(), "0");
lockConfiguration = new LockConfiguration(properties);
dynamoDb = getDynamoClientWithLocalEndpoint();
}
@Test
public void testAcquireLock() {
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
dynamoDbBasedLockProvider.unlock();
}
@Test
public void testUnlock() {
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
dynamoDbBasedLockProvider.unlock();
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
}
@Test
public void testReentrantLock() {
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
Assertions.assertTrue(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
Assertions.assertFalse(dynamoDbBasedLockProvider.tryLock(lockConfiguration.getConfig()
.getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS));
dynamoDbBasedLockProvider.unlock();
}
@Test
public void testUnlockWithoutLock() {
lockConfiguration.getConfig().setProperty(AWSLockConfiguration.DYNAMODB_LOCK_TABLE_NAME.key(), TABLE_NAME_PREFIX + UUID.randomUUID());
DynamoDBBasedLockProvider dynamoDbBasedLockProvider = new DynamoDBBasedLockProvider(lockConfiguration, null, dynamoDb);
dynamoDbBasedLockProvider.unlock();
}
private static AmazonDynamoDB getDynamoClientWithLocalEndpoint() {
String endpoint = System.getProperty("dynamodb-local.endpoint");
if (endpoint == null || endpoint.isEmpty()) {
throw new IllegalStateException("dynamodb-local.endpoint system property not set");
}
return AmazonDynamoDBClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, REGION))
.withCredentials(getCredentials())
.build();
}
private static AWSCredentialsProvider getCredentials() {
return new AWSStaticCredentialsProvider(new BasicAWSCredentials("random-access-key", "random-secret-key"));
}
}

View File

@@ -0,0 +1,25 @@
###
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###
log4j.rootLogger=WARN, A1
log4j.category.org.apache=INFO
log4j.category.org.apache.parquet.hadoop=WARN
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n