1
0

[HUDI-2742] Added S3 object filter to support multiple S3EventsHoodieIncrSources single S3 meta table (#4025)

This commit is contained in:
Harsha Teja Kanna
2021-11-20 03:24:21 -06:00
committed by GitHub
parent 6cc97cc0c9
commit f4b974ac7b

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
@@ -49,7 +50,6 @@ import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_S
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
import static org.apache.hudi.utilities.sources.helpers.CloudObjectsSelector.S3_PREFIX;
/**
* This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}.
@@ -62,6 +62,10 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
// control whether we do existence check for files before consuming them
static final String ENABLE_EXISTS_CHECK = "hoodie.deltastreamer.source.s3incr.check.file.exists";
static final Boolean DEFAULT_ENABLE_EXISTS_CHECK = false;
// control whether to filter the s3 objects starting with this prefix
static final String S3_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.key.prefix";
static final String S3_FS_PREFIX = "hoodie.deltastreamer.source.s3incr.fs.prefix";
}
public S3EventsHoodieIncrSource(
@@ -101,9 +105,18 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME().key(), instantEndpts.getLeft())
.option(DataSourceReadOptions.END_INSTANTTIME().key(), instantEndpts.getRight());
Dataset<Row> source = metaReader.load(srcPath);
String filter = "s3.object.size > 0";
if (!StringUtils.isNullOrEmpty(props.getString(Config.S3_KEY_PREFIX))) {
filter = filter + " and s3.object.key like '" + props.getString(Config.S3_KEY_PREFIX) + "%'";
}
String s3FS = props.getString(Config.S3_FS_PREFIX, "s3").toLowerCase();
String s3Prefix = s3FS + "://";
// Extract distinct file keys from s3 meta hoodie table
final List<Row> cloudMetaDf = source
.filter("s3.object.size > 0")
.filter(filter)
.select("s3.bucket.name", "s3.object.key")
.distinct()
.collectAsList();
@@ -113,9 +126,9 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
for (Row row : cloudMetaDf) {
// construct file path, row index 0 refers to bucket and 1 refers to key
String bucket = row.getString(0);
String filePath = S3_PREFIX + bucket + "/" + row.getString(1);
String filePath = s3Prefix + bucket + "/" + row.getString(1);
if (checkExists) {
FileSystem fs = FSUtils.getFs(S3_PREFIX + bucket, sparkSession.sparkContext().hadoopConfiguration());
FileSystem fs = FSUtils.getFs(s3Prefix + bucket, sparkSession.sparkContext().hadoopConfiguration());
try {
if (fs.exists(new Path(filePath))) {
cloudFiles.add(filePath);