diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java index 769bd3280..434e14a80 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/S3EventsHoodieIncrSource.java @@ -66,6 +66,11 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { // control whether to filter the s3 objects starting with this prefix static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix"; static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix"; + + // control whether to ignore the s3 objects starting with this prefix + static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix"; + // control whether to ignore the s3 objects with this substring + static final String S3_IGNORE_KEY_SUBSTRING = "hoodie.deltastreamer.source.s3incr.ignore.key.substring"; } public S3EventsHoodieIncrSource( @@ -88,6 +93,7 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { if (readLatestOnMissingCkpt) { missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST; } + String fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT); // Use begin Instant if set and non-empty Option beginInstant = @@ -119,6 +125,14 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) { filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'"; } + if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX))) { + filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'"; + } + if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING))) { + filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'"; + } + // add file format filtering by default + filter = filter + " and s3.object.key like '%" + fileFormat + "%'"; String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase(); String s3Prefix = s3FS + "://"; @@ -149,7 +163,6 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource { cloudFiles.add(filePath); } } - String fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT); Option> dataset = Option.empty(); if (!cloudFiles.isEmpty()) { dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0]))); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java index 7252494d9..33c67e4a8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java @@ -66,8 +66,8 @@ public class CloudObjectsSelector { static final String S3_FILE_PATH = "filePath"; public final String queueUrl; public final int longPollWait; - public final int maxMessagesPerRequest; public final int maxMessagePerBatch; + public final int maxMessagesPerRequest; public final int visibilityTimeout; public final TypedProperties props; public final String fsName; @@ -84,8 +84,8 @@ public class CloudObjectsSelector { this.fsName = props.getString(Config.S3_SOURCE_QUEUE_FS, "s3").toLowerCase(); this.longPollWait = props.getInteger(Config.S3_QUEUE_LONG_POLL_WAIT, 20); this.maxMessagePerBatch = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH, 5); + this.maxMessagesPerRequest = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST, 10); this.visibilityTimeout = props.getInteger(Config.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT, 30); - this.maxMessagesPerRequest = 10; } /** @@ -264,6 +264,12 @@ public class CloudObjectsSelector { public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH = HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.batch"; + /** + * {@value #S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST} is max messages for each request. + */ + public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST = + HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.request"; + /** * {@value #S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT} is visibility timeout for messages in queue. After we * consume the message, queue will move the consumed messages to in-flight state, these messages