1
0

[HUDI-1897] Deltastreamer source for AWS S3 (#3433)

- Added two sources for two stage pipeline. a. S3EventsSource that fetches events from SQS and ingests to a meta hoodie table. b. S3EventsHoodieIncrSource reads S3 events from this meta hoodie table, fetches actual objects from S3 and ingests to sink hoodie table. 
- Added selectors to assist in S3EventsSource. 

Co-authored-by: Satish M <84978833+satishmittal1111@users.noreply.github.com>
Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Sagar Sumit
2021-08-14 17:55:10 +05:30
committed by GitHub
parent 9056c68744
commit 5cc96e85c1
12 changed files with 1348 additions and 13 deletions

View File

@@ -402,6 +402,14 @@
<scope>test</scope>
</dependency>
<!-- AWS Services -->
<!-- https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-sqs -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sqs</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<!-- Hive - Test -->
<dependency>
<groupId>${hive.groupid}</groupId>

View File

@@ -42,45 +42,51 @@ public class HoodieIncrSource extends RowSource {
private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class);
protected static class Config {
static class Config {
/**
* {@value #HOODIE_SRC_BASE_PATH} is the base-path for the source Hoodie table.
*/
private static final String HOODIE_SRC_BASE_PATH = "hoodie.deltastreamer.source.hoodieincr.path";
static final String HOODIE_SRC_BASE_PATH = "hoodie.deltastreamer.source.hoodieincr.path";
/**
* {@value #NUM_INSTANTS_PER_FETCH} allows the max number of instants whose changes can be incrementally fetched.
*/
private static final String NUM_INSTANTS_PER_FETCH = "hoodie.deltastreamer.source.hoodieincr.num_instants";
private static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1;
static final String NUM_INSTANTS_PER_FETCH = "hoodie.deltastreamer.source.hoodieincr.num_instants";
static final Integer DEFAULT_NUM_INSTANTS_PER_FETCH = 1;
/**
* {@value #HOODIE_SRC_PARTITION_FIELDS} specifies partition fields that needs to be added to source table after
* parsing _hoodie_partition_path.
*/
private static final String HOODIE_SRC_PARTITION_FIELDS = "hoodie.deltastreamer.source.hoodieincr.partition.fields";
static final String HOODIE_SRC_PARTITION_FIELDS = "hoodie.deltastreamer.source.hoodieincr.partition.fields";
/**
* {@value #HOODIE_SRC_PARTITION_EXTRACTORCLASS} PartitionValueExtractor class to extract partition fields from
* _hoodie_partition_path.
*/
private static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS =
static final String HOODIE_SRC_PARTITION_EXTRACTORCLASS =
"hoodie.deltastreamer.source.hoodieincr.partition.extractor.class";
private static final String DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS =
static final String DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS =
SlashEncodedDayPartitionValueExtractor.class.getCanonicalName();
/**
* {@value #READ_LATEST_INSTANT_ON_MISSING_CKPT} allows delta-streamer to incrementally fetch from latest committed
* instant when checkpoint is not provided.
*/
private static final String READ_LATEST_INSTANT_ON_MISSING_CKPT =
static final String READ_LATEST_INSTANT_ON_MISSING_CKPT =
"hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt";
private static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false;
static final Boolean DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT = false;
/**
* {@value #SOURCE_FILE_FORMAT} is passed to the reader while loading dataset. Default value is parquet.
*/
static final String SOURCE_FILE_FORMAT = "hoodie.deltastreamer.source.hoodieincr.file.format";
static final String DEFAULT_SOURCE_FILE_FORMAT = "parquet";
}
public HoodieIncrSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
@@ -123,10 +129,10 @@ public class HoodieIncrSource extends RowSource {
/*
* log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
*
*
* StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema
* = newSchema.add(field, DataTypes.StringType, true); }
*
*
* /** Validates if the commit time is sane and also generates Partition fields from _hoodie_partition_path if
* configured
*
@@ -139,7 +145,7 @@ public class HoodieIncrSource extends RowSource {
* "#partition-fields != #partition-values-extracted"); List<Object> rowObjs = new
* ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return
* RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema));
*
*
* log.info("Validated Source Schema :" + validated.schema());
*/

View File

@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX;
/**
* This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}.
*/
public class S3EventsHoodieIncrSource extends HoodieIncrSource {
private static final Logger LOG = LogManager.getLogger(S3EventsHoodieIncrSource.class);
static class Config {
// control whether we do existence check for files before consuming them
static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.s3incr.check.file.exists";
static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false;
}
public S3EventsHoodieIncrSource(
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
}
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH));
String srcPath = props.getString(HOODIE_SRC_BASE_PATH);
int numInstantsPerFetch = props.getInteger(NUM_INSTANTS_PER_FETCH, DEFAULT_NUM_INSTANTS_PER_FETCH);
boolean readLatestOnMissingCkpt = props.getBoolean(
READ_LATEST_INSTANT_ON_MISSING_CKPT, DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT);
// Use begin Instant if set and non-empty
Option<String> beginInstant =
lastCkptStr.isPresent()
? lastCkptStr.get().isEmpty() ? Option.empty() : lastCkptStr
: Option.empty();
Pair<String, String> instantEndpts =
IncrSourceHelper.calculateBeginAndEndInstants(
sparkContext, srcPath, numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt);
if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
return Pair.of(Option.empty(), instantEndpts.getKey());
}
// Do incremental pull. Set end instant if available.
DataFrameReader metaReader = sparkSession.read().format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
Dataset<Row> source = metaReader.load(srcPath);
// Extract distinct file keys from s3 meta hoodie table
final List<Row> cloudMetaDf = source
.filter("s3.object.size > 0")
.select("s3.bucket.name", "s3.object.key")
.distinct()
.collectAsList();
// Create S3 paths
final boolean checkExists = props.getBoolean(Config.ENABLE_EXISTS_CHECK, Config.DEFAULT_ENABLE_EXISTS_CHECK);
List<String> cloudFiles = new ArrayList<>();
for (Row row : cloudMetaDf) {
// construct file path, row index 0 refers to bucket and 1 refers to key
String bucket = row.getString(0);
String filePath = S3_PREFIX + bucket + "/" + row.getString(1);
if (checkExists) {
FileSystem fs = FSUtils.getFs(S3_PREFIX + bucket, sparkSession.sparkContext().hadoopConfiguration());
try {
if (fs.exists(new Path(filePath))) {
cloudFiles.add(filePath);
}
} catch (IOException e) {
LOG.error(String.format("Error while checking path exists for %s ", filePath), e);
}
} else {
cloudFiles.add(filePath);
}
}
String fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT);
Option<Dataset<Row>> dataset = Option.empty();
if (!cloudFiles.isEmpty()) {
dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0])));
}
return Pair.of(dataset, instantEndpts.getRight());
}
}

View File

@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.S3EventsMetaSelector;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
/**
* This source provides capability to create the hudi table for S3 events metadata (eg. S3
* put events data). It will use the SQS for receiving the object key events. This can be useful
* to check S3 files activity over time. The hudi table created by this source is consumed by
* {@link S3EventsHoodieIncrSource} to apply changes to the hudi table corresponding to user data.
*/
public class S3EventsSource extends RowSource {
private final S3EventsMetaSelector pathSelector;
private final List<Message> processedMessages = new ArrayList<>();
AmazonSQS sqs;
public S3EventsSource(
TypedProperties props,
JavaSparkContext sparkContext,
SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
this.pathSelector = S3EventsMetaSelector.createSourceSelector(props);
this.sqs = this.pathSelector.createAmazonSqsClient();
}
/**
* Fetches next events from the queue.
*
* @param lastCkptStr The last checkpoint instant string, empty if first run.
* @param sourceLimit Limit on the size of data to fetch. For {@link S3EventsSource},
* {@link org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config#S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH} is used.
* @return A pair of dataset of event records and the next checkpoint instant string
*/
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
Pair<List<String>, String> selectPathsWithLatestSqsMessage =
pathSelector.getNextEventsFromQueue(sqs, lastCkptStr, processedMessages);
if (selectPathsWithLatestSqsMessage.getLeft().isEmpty()) {
return Pair.of(Option.empty(), selectPathsWithLatestSqsMessage.getRight());
} else {
Dataset<String> eventRecords = sparkSession.createDataset(selectPathsWithLatestSqsMessage.getLeft(), Encoders.STRING());
return Pair.of(
Option.of(sparkSession.read().json(eventRecords)),
selectPathsWithLatestSqsMessage.getRight());
}
}
@Override
public void onCommit(String lastCkptStr) {
pathSelector.deleteProcessedMessages(sqs, pathSelector.queueUrl, processedMessages);
processedMessages.clear();
}
}

View File

@@ -0,0 +1,280 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.BatchResultErrorEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.json.JSONObject;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* This class has methods for processing cloud objects.
* It currently supports only AWS S3 objects and AWS SQS queue.
*/
public class CloudObjectsSelector {
public static final List<String> ALLOWED_S3_EVENT_PREFIX =
Collections.singletonList("ObjectCreated");
public static final String S3_PREFIX = "s3://";
public static volatile Logger log = LogManager.getLogger(CloudObjectsSelector.class);
public static final String SQS_ATTR_APPROX_MESSAGES = "ApproximateNumberOfMessages";
static final String SQS_MODEL_MESSAGE = "Message";
static final String SQS_MODEL_EVENT_RECORDS = "Records";
static final String SQS_MODEL_EVENT_NAME = "eventName";
static final String S3_MODEL_EVENT_TIME = "eventTime";
static final String S3_FILE_SIZE = "fileSize";
static final String S3_FILE_PATH = "filePath";
public final String queueUrl;
public final int longPollWait;
public final int maxMessagesPerRequest;
public final int maxMessagePerBatch;
public final int visibilityTimeout;
public final TypedProperties props;
public final String fsName;
private final String regionName;
/**
* Cloud Objects Selector Class. {@link CloudObjectsSelector}
*/
public CloudObjectsSelector(TypedProperties props) {
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.S3_SOURCE_QUEUE_URL, Config.S3_SOURCE_QUEUE_REGION));
this.props = props;
this.queueUrl = props.getString(Config.S3_SOURCE_QUEUE_URL);
this.regionName = props.getString(Config.S3_SOURCE_QUEUE_REGION);
this.fsName = props.getString(Config.S3_SOURCE_QUEUE_FS, "s3").toLowerCase();
this.longPollWait = props.getInteger(Config.S3_QUEUE_LONG_POLL_WAIT, 20);
this.maxMessagePerBatch = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH, 5);
this.visibilityTimeout = props.getInteger(Config.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT, 30);
this.maxMessagesPerRequest = 10;
}
/**
* Get SQS queue attributes.
*
* @param sqsClient AWSClient for sqsClient
* @param queueUrl queue full url
* @return map of attributes needed
*/
protected Map<String, String> getSqsQueueAttributes(AmazonSQS sqsClient, String queueUrl) {
GetQueueAttributesResult queueAttributesResult = sqsClient.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl).withAttributeNames(SQS_ATTR_APPROX_MESSAGES)
);
return queueAttributesResult.getAttributes();
}
/**
* Get the file attributes filePath, eventTime and size from JSONObject record.
*
* @param record of object event
* @return map of file attribute
*/
protected Map<String, Object> getFileAttributesFromRecord(JSONObject record) throws UnsupportedEncodingException {
Map<String, Object> fileRecord = new HashMap<>();
String eventTimeStr = record.getString(S3_MODEL_EVENT_TIME);
long eventTime =
Date.from(Instant.from(DateTimeFormatter.ISO_INSTANT.parse(eventTimeStr))).getTime();
JSONObject s3Object = record.getJSONObject("s3").getJSONObject("object");
String bucket = URLDecoder.decode(record.getJSONObject("s3").getJSONObject("bucket").getString("name"), "UTF-8");
String key = URLDecoder.decode(s3Object.getString("key"), "UTF-8");
String filePath = this.fsName + "://" + bucket + "/" + key;
fileRecord.put(S3_MODEL_EVENT_TIME, eventTime);
fileRecord.put(S3_FILE_SIZE, s3Object.getLong("size"));
fileRecord.put(S3_FILE_PATH, filePath);
return fileRecord;
}
/**
* Amazon SQS Client Builder.
*/
public AmazonSQS createAmazonSqsClient() {
return AmazonSQSClientBuilder.standard().withRegion(Regions.fromName(regionName)).build();
}
/**
* List messages from queue.
*/
protected List<Message> getMessagesToProcess(
AmazonSQS sqsClient,
String queueUrl,
int longPollWait,
int visibilityTimeout,
int maxMessagePerBatch,
int maxMessagesPerRequest) {
List<Message> messagesToProcess = new ArrayList<>();
ReceiveMessageRequest receiveMessageRequest =
new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withWaitTimeSeconds(longPollWait)
.withVisibilityTimeout(visibilityTimeout);
receiveMessageRequest.setMaxNumberOfMessages(maxMessagesPerRequest);
// Get count for available messages
Map<String, String> queueAttributesResult = getSqsQueueAttributes(sqsClient, queueUrl);
long approxMessagesAvailable = Long.parseLong(queueAttributesResult.get(SQS_ATTR_APPROX_MESSAGES));
log.info("Approximately " + approxMessagesAvailable + " messages available in queue.");
long numMessagesToProcess = Math.min(approxMessagesAvailable, maxMessagePerBatch);
for (int i = 0;
i < (int) Math.ceil((double) numMessagesToProcess / maxMessagesPerRequest);
++i) {
List<Message> messages = sqsClient.receiveMessage(receiveMessageRequest).getMessages();
log.debug("Number of messages: " + messages.size());
messagesToProcess.addAll(messages);
if (messages.isEmpty()) {
// ApproximateNumberOfMessages value is eventually consistent.
// So, we still need to check and break if there are no messages.
break;
}
}
return messagesToProcess;
}
/**
* Create partitions of list using specific batch size. we can't use third party API for this
* functionality, due to https://github.com/apache/hudi/blob/master/style/checkstyle.xml#L270
*/
protected List<List<Message>> createListPartitions(List<Message> singleList, int eachBatchSize) {
List<List<Message>> listPartitions = new ArrayList<>();
if (singleList.size() == 0 || eachBatchSize < 1) {
return listPartitions;
}
for (int start = 0; start < singleList.size(); start += eachBatchSize) {
int end = Math.min(start + eachBatchSize, singleList.size());
if (start > end) {
throw new IndexOutOfBoundsException(
"Index " + start + " is out of the list range <0," + (singleList.size() - 1) + ">");
}
listPartitions.add(new ArrayList<>(singleList.subList(start, end)));
}
return listPartitions;
}
/**
* Delete batch of messages from queue.
*/
protected void deleteBatchOfMessages(AmazonSQS sqs, String queueUrl, List<Message> messagesToBeDeleted) {
DeleteMessageBatchRequest deleteBatchReq =
new DeleteMessageBatchRequest().withQueueUrl(queueUrl);
List<DeleteMessageBatchRequestEntry> deleteEntries = deleteBatchReq.getEntries();
for (Message message : messagesToBeDeleted) {
deleteEntries.add(
new DeleteMessageBatchRequestEntry()
.withId(message.getMessageId())
.withReceiptHandle(message.getReceiptHandle()));
}
DeleteMessageBatchResult deleteResult = sqs.deleteMessageBatch(deleteBatchReq);
List<String> deleteFailures =
deleteResult.getFailed().stream()
.map(BatchResultErrorEntry::getId)
.collect(Collectors.toList());
if (!deleteFailures.isEmpty()) {
log.warn(
"Failed to delete "
+ deleteFailures.size()
+ " messages out of "
+ deleteEntries.size()
+ " from queue.");
} else {
log.info("Successfully deleted " + deleteEntries.size() + " messages from queue.");
}
}
/**
* Delete Queue Messages after hudi commit. This method will be invoked by source.onCommit.
*/
public void deleteProcessedMessages(AmazonSQS sqs, String queueUrl, List<Message> processedMessages) {
if (!processedMessages.isEmpty()) {
// create batch for deletion, SES DeleteMessageBatchRequest only accept max 10 entries
List<List<Message>> deleteBatches = createListPartitions(processedMessages, 10);
for (List<Message> deleteBatch : deleteBatches) {
deleteBatchOfMessages(sqs, queueUrl, deleteBatch);
}
}
}
/**
* Configs supported.
*/
public static class Config {
private static final String HOODIE_DELTASTREAMER_S3_SOURCE = "hoodie.deltastreamer.s3.source";
/**
* {@value #S3_SOURCE_QUEUE_URL} is the queue url for cloud object events.
*/
public static final String S3_SOURCE_QUEUE_URL = HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.url";
/**
* {@value #S3_SOURCE_QUEUE_REGION} is the case-sensitive region name of the cloud provider for the queue. For example, "us-east-1".
*/
public static final String S3_SOURCE_QUEUE_REGION = HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.region";
/**
* {@value #S3_SOURCE_QUEUE_FS} is file system corresponding to queue. For example, for AWS SQS it is s3/s3a.
*/
public static final String S3_SOURCE_QUEUE_FS = HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.fs";
/**
* {@value #S3_QUEUE_LONG_POLL_WAIT} is the long poll wait time in seconds If set as 0 then
* client will fetch on short poll basis.
*/
public static final String S3_QUEUE_LONG_POLL_WAIT =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.long.poll.wait";
/**
* {@value #S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH} is max messages for each batch of delta streamer
* run. Source will process these maximum number of message at a time.
*/
public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.batch";
/**
* {@value #S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT} is visibility timeout for messages in queue. After we
* consume the message, queue will move the consumed messages to in-flight state, these messages
* can't be consumed again by source for this timeout period.
*/
public static final String S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.visibility.timeout";
/**
* {@value #SOURCE_INPUT_SELECTOR} source input selector.
*/
public static final String SOURCE_INPUT_SELECTOR = "hoodie.deltastreamer.source.input.selector";
}
}

View File

@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.json.JSONException;
import org.json.JSONObject;
import java.io.IOException;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* S3 events metadata selector class. This class provides methods to process the
* messages from SQS for {@link org.apache.hudi.utilities.sources.S3EventsSource}.
*/
public class S3EventsMetaSelector extends CloudObjectsSelector {
private static final String S3_EVENT_RESPONSE_ELEMENTS = "responseElements";
/**
* Cloud Objects Meta Selector Class. {@link CloudObjectsSelector}
*/
public S3EventsMetaSelector(TypedProperties props) {
super(props);
}
/**
* Factory method for creating custom CloudObjectsMetaSelector. Default selector to use is {@link
* S3EventsMetaSelector}
*/
public static S3EventsMetaSelector createSourceSelector(TypedProperties props) {
String sourceSelectorClass =
props.getString(
S3EventsMetaSelector.Config.SOURCE_INPUT_SELECTOR,
S3EventsMetaSelector.class.getName());
try {
S3EventsMetaSelector selector =
(S3EventsMetaSelector)
ReflectionUtils.loadClass(
sourceSelectorClass, new Class<?>[] {TypedProperties.class}, props);
log.info("Using path selector " + selector.getClass().getName());
return selector;
} catch (Exception e) {
throw new HoodieException("Could not load source selector class " + sourceSelectorClass, e);
}
}
/**
* List messages from queue, filter out illegible events while doing so. It will also delete the
* ineligible messages from queue.
*
* @param processedMessages array of processed messages to add more messages
* @return the filtered list of valid S3 events in SQS.
*/
protected List<Map<String, Object>> getValidEvents(AmazonSQS sqs, List<Message> processedMessages) throws IOException {
List<Message> messages =
getMessagesToProcess(
sqs,
this.queueUrl,
this.longPollWait,
this.visibilityTimeout,
this.maxMessagePerBatch,
this.maxMessagesPerRequest);
return processAndDeleteInvalidMessages(processedMessages, messages);
}
private List<Map<String, Object>> processAndDeleteInvalidMessages(List<Message> processedMessages,
List<Message> messages) throws IOException {
List<Map<String, Object>> validEvents = new ArrayList<>();
for (Message message : messages) {
JSONObject messageBody = new JSONObject(message.getBody());
Map<String, Object> messageMap;
ObjectMapper mapper = new ObjectMapper();
if (messageBody.has(SQS_MODEL_MESSAGE)) {
// If this messages is from S3Event -> SNS -> SQS
messageMap = (Map<String, Object>) mapper.readValue(messageBody.getString(SQS_MODEL_MESSAGE), Map.class);
} else {
// If this messages is from S3Event -> SQS
messageMap = (Map<String, Object>) mapper.readValue(messageBody.toString(), Map.class);
}
if (messageMap.containsKey(SQS_MODEL_EVENT_RECORDS)) {
List<Map<String, Object>> events = (List<Map<String, Object>>) messageMap.get(SQS_MODEL_EVENT_RECORDS);
for (Map<String, Object> event : events) {
event.remove(S3_EVENT_RESPONSE_ELEMENTS);
String eventName = (String) event.get(SQS_MODEL_EVENT_NAME);
// filter only allowed s3 event types
if (ALLOWED_S3_EVENT_PREFIX.stream().anyMatch(eventName::startsWith)) {
validEvents.add(event);
} else {
log.debug(String.format("This S3 event %s is not allowed, so ignoring it.", eventName));
}
}
} else {
log.debug(String.format("Message is not expected format or it's s3:TestEvent. Message: %s", message));
}
processedMessages.add(message);
}
return validEvents;
}
/**
* Get the list of events from queue.
*
* @param lastCheckpointStr The last checkpoint instant string, empty if first run.
* @return A pair of dataset of event records and the next checkpoint instant string.
*/
public Pair<List<String>, String> getNextEventsFromQueue(AmazonSQS sqs,
Option<String> lastCheckpointStr,
List<Message> processedMessages) {
processedMessages.clear();
log.info("Reading messages....");
try {
log.info("Start Checkpoint : " + lastCheckpointStr);
List<Map<String, Object>> eventRecords = getValidEvents(sqs, processedMessages);
log.info("Number of valid events: " + eventRecords.size());
List<String> filteredEventRecords = new ArrayList<>();
long newCheckpointTime = eventRecords.stream()
.mapToLong(eventRecord -> Date.from(Instant.from(
DateTimeFormatter.ISO_INSTANT.parse((String) eventRecord.get(S3_MODEL_EVENT_TIME))))
.getTime()).max().orElse(lastCheckpointStr.map(Long::parseLong).orElse(0L));
for (Map<String, Object> eventRecord : eventRecords) {
filteredEventRecords.add(new ObjectMapper().writeValueAsString(eventRecord).replace("%3D", "="));
}
return new ImmutablePair<>(filteredEventRecords, String.valueOf(newCheckpointTime));
} catch (JSONException | IOException e) {
throw new HoodieException("Unable to read from SQS: ", e);
}
}
}

View File

@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter;
import org.apache.hudi.utilities.testutils.sources.AbstractCloudObjectsSourceTestBase;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_FS;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Basic tests for {@link S3EventsSource}.
*/
public class TestS3EventsSource extends AbstractCloudObjectsSourceTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
this.dfsRoot = dfsBasePath + "/parquetFiles";
this.fileSuffix = ".parquet";
dfs.mkdirs(new Path(dfsRoot));
}
@AfterEach
public void teardown() throws Exception {
super.teardown();
}
/**
* Runs the test scenario of reading data from the source.
*
* @throws IOException
*/
@Test
public void testReadingFromSource() throws IOException {
SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareCloudObjectSource());
// 1. Extract without any checkpoint => (no data available)
generateMessageInQueue(null);
assertEquals(
Option.empty(),
sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
// 2. Extract without any checkpoint => (adding new file)
generateMessageInQueue("1");
// Test fetching Avro format
InputBatch<JavaRDD<GenericRecord>> fetch1 =
sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
assertEquals(1, fetch1.getBatch().get().count());
// 3. Produce new data, extract new data
generateMessageInQueue("2");
// Test fetching Avro format
InputBatch<JavaRDD<GenericRecord>> fetch2 =
sourceFormatAdapter.fetchNewDataInAvroFormat(
Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE);
assertEquals(1, fetch2.getBatch().get().count());
GenericRecord s3 = (GenericRecord) fetch2.getBatch().get().rdd().first().get("s3");
GenericRecord s3Object = (GenericRecord) s3.get("object");
assertEquals("2.parquet", s3Object.get("key").toString());
}
@Override
public Source prepareCloudObjectSource() {
TypedProperties props = new TypedProperties();
props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl);
props.setProperty(S3_SOURCE_QUEUE_REGION, regionName);
props.setProperty(S3_SOURCE_QUEUE_FS, "hdfs");
S3EventsSource dfsSource = new S3EventsSource(props, jsc, sparkSession, null);
dfsSource.sqs = this.sqs;
return dfsSource;
}
@Override
public void writeNewDataToFile(List<HoodieRecord> records, Path path) throws IOException {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(records), path);
}
}

View File

@@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_FILE_PATH;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_FILE_SIZE;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_MODEL_EVENT_TIME;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_MODEL_EVENT_RECORDS;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_MODEL_MESSAGE;
import static org.apache.hudi.utilities.testutils.CloudObjectTestUtils.deleteMessagesInQueue;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestCloudObjectsSelector extends HoodieClientTestHarness {
static final String REGION_NAME = "us-east-1";
TypedProperties props;
String sqsUrl;
@Mock
AmazonSQS sqs;
@Mock
private CloudObjectsSelector cloudObjectsSelector;
@BeforeEach
void setUp() {
initSparkContexts();
initPath();
initFileSystem();
MockitoAnnotations.initMocks(this);
props = new TypedProperties();
sqsUrl = "test-queue";
props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl);
props.setProperty(S3_SOURCE_QUEUE_REGION, REGION_NAME);
}
@AfterEach
public void teardown() throws Exception {
Mockito.reset(cloudObjectsSelector);
cleanupResources();
}
@ParameterizedTest
@ValueSource(classes = {CloudObjectsSelector.class})
public void testSqsQueueAttributesShouldReturnsRequiredAttribute(Class<?> clazz) {
CloudObjectsSelector selector =
(CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props);
// setup the queue attributes
CloudObjectTestUtils.setMessagesInQueue(sqs, null);
// test the return values
Map<String, String> queueAttributes = selector.getSqsQueueAttributes(sqs, sqsUrl);
assertEquals(1, queueAttributes.size());
// ApproximateNumberOfMessages is a required queue attribute for Cloud object selector
assertEquals("0", queueAttributes.get(SQS_ATTR_APPROX_MESSAGES));
}
@ParameterizedTest
@ValueSource(classes = {CloudObjectsSelector.class})
public void testFileAttributesFromRecordShouldReturnsExpectOutput(Class<?> clazz)
throws IOException {
CloudObjectsSelector selector =
(CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props);
// setup s3 record
String bucket = "test-bucket";
String key = "test/year=test1/month=test2/day=test3/part-foo-bar.snappy.parquet";
String s3Records =
"{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"1\",\n \"TopicArn\" : \"arn:aws:sns:foo:123:"
+ "foo-bar\",\n \"Subject\" : \"Amazon S3 Notification\",\n \"Message\" : \"{\\\"Records\\\":"
+ "[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us"
+ "-west-2\\\",\\\"eventTime\\\":\\\"2021-07-27T09:05:36.755Z\\\",\\\"eventName\\\":\\\"ObjectCreated"
+ ":Copy\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:test\\\"},\\\"requestParameters\\\":"
+ "{\\\"sourceIPAddress\\\":\\\"0.0.0.0\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\""
+ "test\\\",\\\"x-amz-id-2\\\":\\\"foobar\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\""
+ "configurationId\\\":\\\"foobar\\\",\\\"bucket\\\":{\\\"name\\\":\\\""
+ bucket
+ "\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"foo\\\"},\\\"arn\\\":\\\"arn:aws:s3:::foo\\\"}"
+ ",\\\"object\\\":{\\\"key\\\":\\\""
+ key
+ "\\\",\\\"size\\\":123,\\\"eTag\\\":\\\"test\\\",\\\"sequencer\\\":\\\"1\\\"}}}]}\"}";
JSONObject messageBody = new JSONObject(s3Records);
Map<String, Object> messageMap = new HashMap<>();
if (messageBody.has(SQS_MODEL_MESSAGE)) {
ObjectMapper mapper = new ObjectMapper();
messageMap =
(Map<String, Object>) mapper.readValue(messageBody.getString(SQS_MODEL_MESSAGE), Map.class);
}
List<Map<String, Object>> records = (List<Map<String, Object>>) messageMap.get(SQS_MODEL_EVENT_RECORDS);
// test the return values
Map<String, Object> fileAttributes =
selector.getFileAttributesFromRecord(new JSONObject(records.get(0)));
assertEquals(3, fileAttributes.size());
assertEquals(123L, (long) fileAttributes.get(S3_FILE_SIZE));
assertEquals(S3_PREFIX + bucket + "/" + key, fileAttributes.get(S3_FILE_PATH));
assertEquals(1627376736755L, (long) fileAttributes.get(S3_MODEL_EVENT_TIME));
}
@ParameterizedTest
@ValueSource(classes = {CloudObjectsSelector.class})
public void testCreateListPartitionsReturnsExpectedSetOfBatch(Class<?> clazz) {
CloudObjectsSelector selector =
(CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props);
// setup lists
List<Message> testSingleList = new ArrayList<>();
testSingleList.add(new Message().addAttributesEntry("id", "1"));
testSingleList.add(new Message().addAttributesEntry("id", "2"));
testSingleList.add(new Message().addAttributesEntry("id", "3"));
testSingleList.add(new Message().addAttributesEntry("id", "4"));
testSingleList.add(new Message().addAttributesEntry("id", "5"));
List<Message> expectedFirstList = new ArrayList<>();
expectedFirstList.add(new Message().addAttributesEntry("id", "1"));
expectedFirstList.add(new Message().addAttributesEntry("id", "2"));
List<Message> expectedSecondList = new ArrayList<>();
expectedSecondList.add(new Message().addAttributesEntry("id", "3"));
expectedSecondList.add(new Message().addAttributesEntry("id", "4"));
List<Message> expectedFinalList = new ArrayList<>();
expectedFinalList.add(new Message().addAttributesEntry("id", "5"));
// test the return values
List<List<Message>> partitionedList = selector.createListPartitions(testSingleList, 2);
assertEquals(3, partitionedList.size());
assertEquals(expectedFirstList, partitionedList.get(0));
assertEquals(expectedSecondList, partitionedList.get(1));
assertEquals(expectedFinalList, partitionedList.get(2));
}
@ParameterizedTest
@ValueSource(classes = {CloudObjectsSelector.class})
public void testCreateListPartitionsReturnsEmptyIfBatchSizeIsZero(Class<?> clazz) {
CloudObjectsSelector selector =
(CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props);
// setup lists
List<Message> testSingleList = new ArrayList<>();
testSingleList.add(new Message().addAttributesEntry("id", "1"));
testSingleList.add(new Message().addAttributesEntry("id", "2"));
// test the return values
List<List<Message>> partitionedList = selector.createListPartitions(testSingleList, 0);
assertEquals(0, partitionedList.size());
}
@ParameterizedTest
@ValueSource(classes = {CloudObjectsSelector.class})
public void testOnCommitDeleteProcessedMessages(Class<?> clazz) {
CloudObjectsSelector selector =
(CloudObjectsSelector) ReflectionUtils.loadClass(clazz.getName(), props);
// setup lists
List<Message> testSingleList = new ArrayList<>();
testSingleList.add(
new Message()
.addAttributesEntry("MessageId", "1")
.addAttributesEntry("ReceiptHandle", "1"));
testSingleList.add(
new Message()
.addAttributesEntry("MessageId", "2")
.addAttributesEntry("ReceiptHandle", "1"));
deleteMessagesInQueue(sqs);
// test the return values
selector.deleteProcessedMessages(sqs, sqsUrl, testSingleList);
}
}

View File

@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.sources.helpers;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.testutils.HoodieClientTestHarness;
import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import org.apache.hadoop.fs.Path;
import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_REGION;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.Config.S3_SOURCE_QUEUE_URL;
import static org.apache.hudi.utilities.sources.helpers.TestCloudObjectsSelector.REGION_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestS3EventsMetaSelector extends HoodieClientTestHarness {
TypedProperties props;
String sqsUrl;
@Mock
AmazonSQS sqs;
@Mock
private S3EventsMetaSelector s3EventsMetaSelector;
@BeforeEach
void setUp() {
initSparkContexts();
initPath();
initFileSystem();
MockitoAnnotations.initMocks(this);
props = new TypedProperties();
sqsUrl = "test-queue";
props.setProperty(S3_SOURCE_QUEUE_URL, sqsUrl);
props.setProperty(S3_SOURCE_QUEUE_REGION, REGION_NAME);
}
@AfterEach
public void teardown() throws Exception {
Mockito.reset(s3EventsMetaSelector);
cleanupResources();
}
@ParameterizedTest
@ValueSource(classes = {S3EventsMetaSelector.class})
public void testNextEventsFromQueueShouldReturnsEventsFromQueue(Class<?> clazz) {
S3EventsMetaSelector selector = (S3EventsMetaSelector) ReflectionUtils.loadClass(clazz.getName(), props);
// setup s3 record
String bucket = "test-bucket";
String key = "part-foo-bar.snappy.parquet";
Path path = new Path(bucket, key);
CloudObjectTestUtils.setMessagesInQueue(sqs, path);
List<Message> processed = new ArrayList<>();
// test the return values
Pair<List<String>, String> eventFromQueue =
selector.getNextEventsFromQueue(sqs, Option.empty(), processed);
assertEquals(1, eventFromQueue.getLeft().size());
assertEquals(1, processed.size());
assertEquals(
key,
new JSONObject(eventFromQueue.getLeft().get(0))
.getJSONObject("s3")
.getJSONObject("object")
.getString("key"));
assertEquals("1627376736755", eventFromQueue.getRight());
}
}

View File

@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.utilities.testutils;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
import java.util.List;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.SQS_ATTR_APPROX_MESSAGES;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
/**
* Utils Class for unit testing on CloudObject related sources.
*/
public class CloudObjectTestUtils {
/**
* Set a return value for mocked sqs instance. It will add a new messages (s3 Event) and set
* ApproximateNumberOfMessages attribute of the queue.
*
* @param sqs Mocked instance of AmazonSQS
* @param path Absolute Path of file in FileSystem
*/
public static void setMessagesInQueue(AmazonSQS sqs, Path path) {
ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult();
String approximateNumberOfMessages = "0";
if (path != null) {
String body =
"{\n \"Type\" : \"Notification\",\n \"MessageId\" : \"1\",\n \"TopicArn\" : \"arn:aws:sns:foo:123:"
+ "foo-bar\",\n \"Subject\" : \"Amazon S3 Notification\",\n \"Message\" : \"{\\\"Records\\\":"
+ "[{\\\"eventVersion\\\":\\\"2.1\\\",\\\"eventSource\\\":\\\"aws:s3\\\",\\\"awsRegion\\\":\\\"us"
+ "-west-2\\\",\\\"eventTime\\\":\\\"2021-07-27T09:05:36.755Z\\\",\\\"eventName\\\":\\\"ObjectCreated"
+ ":Copy\\\",\\\"userIdentity\\\":{\\\"principalId\\\":\\\"AWS:test\\\"},\\\"requestParameters\\\":"
+ "{\\\"sourceIPAddress\\\":\\\"0.0.0.0\\\"},\\\"responseElements\\\":{\\\"x-amz-request-id\\\":\\\""
+ "test\\\",\\\"x-amz-id-2\\\":\\\"foobar\\\"},\\\"s3\\\":{\\\"s3SchemaVersion\\\":\\\"1.0\\\",\\\""
+ "configurationId\\\":\\\"foobar\\\",\\\"bucket\\\":{\\\"name\\\":\\\""
+ path.getParent().toString().replace("hdfs://", "")
+ "\\\",\\\"ownerIdentity\\\":{\\\"principalId\\\":\\\"foo\\\"},\\\"arn\\\":\\\"arn:aws:s3:::foo\\\"}"
+ ",\\\"object\\\":{\\\"key\\\":\\\""
+ path.getName()
+ "\\\",\\\"size\\\":123,\\\"eTag\\\":\\\"test\\\",\\\"sequencer\\\":\\\"1\\\"}}}]}\"}";
Message message = new Message();
message.setReceiptHandle("1");
message.setMessageId("1");
message.setBody(body);
List<Message> messages = new ArrayList<>();
messages.add(message);
receiveMessageResult.setMessages(messages);
approximateNumberOfMessages = "1";
}
when(sqs.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(receiveMessageResult);
when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class)))
.thenReturn(
new GetQueueAttributesResult()
.addAttributesEntry(SQS_ATTR_APPROX_MESSAGES, approximateNumberOfMessages));
}
/**
* Mock the sqs.deleteMessageBatch() method from queue.
*
* @param sqs Mocked instance of AmazonSQS
*/
public static void deleteMessagesInQueue(AmazonSQS sqs) {
when(sqs.deleteMessageBatch(any(DeleteMessageBatchRequest.class)))
.thenReturn(new DeleteMessageBatchResult());
}
}

View File

@@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.utilities.testutils.sources;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.testutils.CloudObjectTestUtils;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import com.amazonaws.services.sqs.AmazonSQS;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.io.IOException;
import java.util.List;
/**
* An abstract test base for {@link Source} using CloudObjects as the file system.
*/
public abstract class AbstractCloudObjectsSourceTestBase extends UtilitiesTestBase {
protected FilebasedSchemaProvider schemaProvider;
protected String dfsRoot;
protected String fileSuffix;
protected HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
protected boolean useFlattenedSchema = false;
protected String sqsUrl = "test-queue";
protected String regionName = "us-east-1";
@Mock
protected AmazonSQS sqs;
@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initClass();
}
@AfterAll
public static void cleanupClass() {
UtilitiesTestBase.cleanupClass();
}
@BeforeEach
public void setup() throws Exception {
super.setup();
schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc);
MockitoAnnotations.initMocks(this);
}
@AfterEach
public void teardown() throws Exception {
super.teardown();
}
/**
* Prepares the specific {@link Source} to test, by passing in necessary configurations.
*
* @return A {@link Source} using DFS as the file system.
*/
protected abstract Source prepareCloudObjectSource();
/**
* Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS.
*
* @param records Test data.
* @param path The path in {@link Path} of the file to write.
*/
protected abstract void writeNewDataToFile(List<HoodieRecord> records, Path path)
throws IOException;
/**
* Generates a batch of test data and writes the data to a file.
*
* @param filename The name of the file.
* @param instantTime The commit time.
* @param n The number of records to generate.
* @return The file path.
*/
protected Path generateOneFile(String filename, String instantTime, int n) throws IOException {
Path path = new Path(dfsRoot, filename + fileSuffix);
writeNewDataToFile(dataGenerator.generateInserts(instantTime, n, useFlattenedSchema), path);
return path;
}
public void generateMessageInQueue(String filename) {
Path path = null;
if (filename != null) {
path = new Path(dfsRoot, filename + fileSuffix);
}
CloudObjectTestUtils.setMessagesInQueue(sqs, path);
}
}

View File

@@ -153,6 +153,7 @@
<shadeSources>true</shadeSources>
<zk-curator.version>2.7.1</zk-curator.version>
<antlr.version>4.7</antlr.version>
<aws.sdk.version>1.12.22</aws.sdk.version>
</properties>
<scm>