1
0

[HUDI-3009] making some fixes to S3 incremental source (#4517)

This commit is contained in:
Sivabalan Narayanan
2022-01-09 12:46:52 -05:00
committed by GitHub
parent 977d3c6dad
commit 604d9885f1
2 changed files with 22 additions and 3 deletions

View File

@@ -66,6 +66,11 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
// control whether to filter the s3 objects starting with this prefix
static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix";
static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix";
// control whether to ignore the s3 objects starting with this prefix
static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix";
// control whether to ignore the s3 objects with this substring
static final String S3_IGNORE_KEY_SUBSTRING = "hoodie.deltastreamer.source.s3incr.ignore.key.substring";
}
public S3EventsHoodieIncrSource(
@@ -88,6 +93,7 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
if (readLatestOnMissingCkpt) {
missingCheckpointStrategy = IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST;
}
String fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT);
// Use begin Instant if set and non-empty
Option<String> beginInstant =
@@ -119,6 +125,14 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {
filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
}
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_PREFIX))) {
filter = filter + " and s3.object.key not like '" + props.getString(Config.S3_IGNORE_KEY_PREFIX) + "%'";
}
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_IGNORE_KEY_SUBSTRING))) {
filter = filter + " and s3.object.key not like '%" + props.getString(Config.S3_IGNORE_KEY_SUBSTRING) + "%'";
}
// add file format filtering by default
filter = filter + " and s3.object.key like '%" + fileFormat + "%'";
String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";
@@ -149,7 +163,6 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
cloudFiles.add(filePath);
}
}
String fileFormat = props.getString(SOURCE_FILE_FORMAT, DEFAULT_SOURCE_FILE_FORMAT);
Option<Dataset<Row>> dataset = Option.empty();
if (!cloudFiles.isEmpty()) {
dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0])));

View File

@@ -66,8 +66,8 @@ public class CloudObjectsSelector {
static final String S3_FILE_PATH = "filePath";
public final String queueUrl;
public final int longPollWait;
public final int maxMessagesPerRequest;
public final int maxMessagePerBatch;
public final int maxMessagesPerRequest;
public final int visibilityTimeout;
public final TypedProperties props;
public final String fsName;
@@ -84,8 +84,8 @@ public class CloudObjectsSelector {
this.fsName = props.getString(Config.S3_SOURCE_QUEUE_FS, "s3").toLowerCase();
this.longPollWait = props.getInteger(Config.S3_QUEUE_LONG_POLL_WAIT, 20);
this.maxMessagePerBatch = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH, 5);
this.maxMessagesPerRequest = props.getInteger(Config.S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST, 10);
this.visibilityTimeout = props.getInteger(Config.S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT, 30);
this.maxMessagesPerRequest = 10;
}
/**
@@ -264,6 +264,12 @@ public class CloudObjectsSelector {
public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_BATCH =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.batch";
/**
* {@value #S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST} is max messages for each request.
*/
public static final String S3_SOURCE_QUEUE_MAX_MESSAGES_PER_REQUEST =
HOODIE_DELTASTREAMER_S3_SOURCE + ".queue.max.messages.per.request";
/**
* {@value #S3_SOURCE_QUEUE_VISIBILITY_TIMEOUT} is visibility timeout for messages in queue. After we
* consume the message, queue will move the consumed messages to in-flight state, these messages