[HUDI-3745] Support for spark datasource options in S3EventsHoodieIncrSource (#5170)
This commit is contained in:
@@ -26,14 +26,18 @@ import org.apache.hudi.common.model.HoodieRecord;
|
|||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||||
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
|
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
|
||||||
|
|
||||||
|
import com.esotericsoftware.minlog.Log;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.DataFrameReader;
|
||||||
import org.apache.spark.sql.Dataset;
|
import org.apache.spark.sql.Dataset;
|
||||||
import org.apache.spark.sql.Row;
|
import org.apache.spark.sql.Row;
|
||||||
import org.apache.spark.sql.SparkSession;
|
import org.apache.spark.sql.SparkSession;
|
||||||
@@ -42,6 +46,7 @@ import java.io.IOException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
|
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_NUM_INSTANTS_PER_FETCH;
|
||||||
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
|
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.DEFAULT_READ_LATEST_INSTANT_ON_MISSING_CKPT;
|
||||||
@@ -50,7 +55,6 @@ import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_S
|
|||||||
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
|
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.NUM_INSTANTS_PER_FETCH;
|
||||||
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
|
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.READ_LATEST_INSTANT_ON_MISSING_CKPT;
|
||||||
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
|
import static org.apache.hudi.utilities.sources.HoodieIncrSource.Config.SOURCE_FILE_FORMAT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}.
|
* This source will use the S3 events meta information from hoodie table generate by {@link S3EventsSource}.
|
||||||
*/
|
*/
|
||||||
@@ -71,6 +75,12 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
|
|||||||
static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix";
|
static final String S3_IGNORE_KEY_PREFIX = "hoodie.deltastreamer.source.s3incr.ignore.key.prefix";
|
||||||
// control whether to ignore the s3 objects with this substring
|
// control whether to ignore the s3 objects with this substring
|
||||||
static final String S3_IGNORE_KEY_SUBSTRING = "hoodie.deltastreamer.source.s3incr.ignore.key.substring";
|
static final String S3_IGNORE_KEY_SUBSTRING = "hoodie.deltastreamer.source.s3incr.ignore.key.substring";
|
||||||
|
/**
|
||||||
|
*{@value #SPARK_DATASOURCE_OPTIONS} is json string, passed to the reader while loading dataset.
|
||||||
|
* Example delta streamer conf
|
||||||
|
* - --hoodie-conf hoodie.deltastreamer.source.s3incr.spark.datasource.options={"header":"true","encoding":"UTF-8"}
|
||||||
|
*/
|
||||||
|
static final String SPARK_DATASOURCE_OPTIONS = "hoodie.deltastreamer.source.s3incr.spark.datasource.options";
|
||||||
}
|
}
|
||||||
|
|
||||||
public S3EventsHoodieIncrSource(
|
public S3EventsHoodieIncrSource(
|
||||||
@@ -81,6 +91,22 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
|
|||||||
super(props, sparkContext, sparkSession, schemaProvider);
|
super(props, sparkContext, sparkSession, schemaProvider);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private DataFrameReader getDataFrameReader(String fileFormat) {
|
||||||
|
DataFrameReader dataFrameReader = sparkSession.read().format(fileFormat);
|
||||||
|
if (!StringUtils.isNullOrEmpty(props.getString(Config.SPARK_DATASOURCE_OPTIONS, null))) {
|
||||||
|
final ObjectMapper mapper = new ObjectMapper();
|
||||||
|
Map<String, String> sparkOptionsMap = null;
|
||||||
|
try {
|
||||||
|
sparkOptionsMap = mapper.readValue(props.getString(Config.SPARK_DATASOURCE_OPTIONS), Map.class);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieException(String.format("Failed to parse sparkOptions: %s", props.getString(Config.SPARK_DATASOURCE_OPTIONS)), e);
|
||||||
|
}
|
||||||
|
Log.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
|
||||||
|
dataFrameReader = dataFrameReader.options(sparkOptionsMap);
|
||||||
|
}
|
||||||
|
return dataFrameReader;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
|
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkptStr, long sourceLimit) {
|
||||||
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH));
|
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(HOODIE_SRC_BASE_PATH));
|
||||||
@@ -174,7 +200,8 @@ public class S3EventsHoodieIncrSource extends HoodieIncrSource {
|
|||||||
}
|
}
|
||||||
Option<Dataset<Row>> dataset = Option.empty();
|
Option<Dataset<Row>> dataset = Option.empty();
|
||||||
if (!cloudFiles.isEmpty()) {
|
if (!cloudFiles.isEmpty()) {
|
||||||
dataset = Option.of(sparkSession.read().format(fileFormat).load(cloudFiles.toArray(new String[0])));
|
DataFrameReader dataFrameReader = getDataFrameReader(fileFormat);
|
||||||
|
dataset = Option.of(dataFrameReader.load(cloudFiles.toArray(new String[0])));
|
||||||
}
|
}
|
||||||
return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight());
|
return Pair.of(dataset, queryTypeAndInstantEndpts.getRight().getRight());
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user