1
0

New Features in DeltaStreamer :

(1) Apply transformation when using delta-streamer to ingest data.
 (2) Add Hudi Incremental Source for Delta Streamer
 (3) Allow delta-streamer config-property to be passed as command-line
 (4) Add Hive Integration to Delta-Streamer and address Review comments
 (5) Ensure MultiPartKeysValueExtractor  handle hive style partition description
 (6) Reuse same spark session on both source and transformer
 (7) Support extracting partition fields from _hoodie_partition_path for HoodieIncrSource
 (8) Reuse Binary Avro coders
 (9) Add push down filter for Incremental source
 (10) Add Hoodie DeltaStreamer metrics to track total time taken
This commit is contained in:
Balaji Varadarajan
2018-10-10 10:31:34 -07:00
committed by vinoth chandar
parent c70dbc13e9
commit 3a0044216c
65 changed files with 2752 additions and 911 deletions

View File

@@ -29,8 +29,13 @@ import com.uber.hoodie.config.HoodieWriteConfig;
import com.uber.hoodie.exception.DatasetNotFoundException;
import com.uber.hoodie.exception.HoodieException;
import com.uber.hoodie.exception.HoodieNotSupportedException;
import com.uber.hoodie.hive.HiveSyncConfig;
import com.uber.hoodie.hive.PartitionValueExtractor;
import com.uber.hoodie.hive.SlashEncodedDayPartitionValueExtractor;
import com.uber.hoodie.index.HoodieIndex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -86,6 +91,17 @@ public class DataSourceUtils {
}
}
/**
* Create a partition value extractor class via reflection, passing in any configs needed
*/
public static PartitionValueExtractor createPartitionExtractor(String partitionExtractorClass) {
try {
return (PartitionValueExtractor) ReflectionUtils.loadClass(partitionExtractorClass);
} catch (Throwable e) {
throw new HoodieException("Could not load partition extractor class " + partitionExtractorClass, e);
}
}
/**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/
@@ -169,4 +185,28 @@ public class DataSourceUtils {
.withProps(parameters).build();
return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
}
public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, String basePath) {
checkRequiredProperties(props, Arrays.asList(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY()));
HiveSyncConfig hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.basePath = basePath;
hiveSyncConfig.assumeDatePartitioning =
props.getBoolean(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY(),
Boolean.valueOf(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL()));
hiveSyncConfig.databaseName = props.getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_HIVE_DATABASE_OPT_VAL());
hiveSyncConfig.tableName = props.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
hiveSyncConfig.hiveUser = props.getString(DataSourceWriteOptions.HIVE_USER_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_HIVE_USER_OPT_VAL());
hiveSyncConfig.hivePass = props.getString(DataSourceWriteOptions.HIVE_PASS_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_HIVE_PASS_OPT_VAL());
hiveSyncConfig.jdbcUrl = props.getString(DataSourceWriteOptions.HIVE_URL_OPT_KEY(),
DataSourceWriteOptions.DEFAULT_HIVE_URL_OPT_VAL());
hiveSyncConfig.partitionFields =
props.getStringList(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), ",", new ArrayList<>());
hiveSyncConfig.partitionValueExtractorClass =
props.getString(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
SlashEncodedDayPartitionValueExtractor.class.getName());
return hiveSyncConfig;
}
}

View File

@@ -22,13 +22,18 @@ import java.sql.{Date, Timestamp}
import java.util
import com.databricks.spark.avro.SchemaConverters
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.GenericRecord
import com.databricks.spark.avro.SchemaConverters.IncompatibleSchemaException
import org.apache.avro.Schema.Type._
import org.apache.avro.generic.GenericData.{Fixed, Record}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.{Schema, SchemaBuilder}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import scala.collection.JavaConverters._
object AvroConversionUtils {
@@ -46,6 +51,22 @@ object AvroConversionUtils {
}
}
def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss : SparkSession): Dataset[Row] = {
if (rdd.isEmpty()) {
ss.emptyDataFrame
} else {
ss.createDataFrame(rdd.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val schema = Schema.parse(schemaStr)
val dataType = convertAvroSchemaToStructType(schema)
val convertor = createConverterToRow(schema, dataType)
records.map { x => convertor(x).asInstanceOf[Row] }
}
}, convertAvroSchemaToStructType(Schema.parse(schemaStr))).asInstanceOf[Dataset[Row]]
}
}
def getNewRecordNamespace(elementDataType: DataType,
currentRecordNamespace: String,
elementName: String): String = {
@@ -56,6 +77,185 @@ object AvroConversionUtils {
}
}
/**
* NOTE : This part of code is copied from com.databricks.spark.avro.SchemaConverters.scala (133:310) (spark-avro)
*
* Returns a converter function to convert row in avro format to GenericRow of catalyst.
*
* @param sourceAvroSchema Source schema before conversion inferred from avro file by passed in
* by user.
* @param targetSqlType Target catalyst sql type after the conversion.
* @return returns a converter function to convert row in avro format to GenericRow of catalyst.
*/
def createConverterToRow(sourceAvroSchema: Schema,
targetSqlType: DataType): AnyRef => AnyRef = {
def createConverter(avroSchema: Schema,
sqlType: DataType, path: List[String]): AnyRef => AnyRef = {
val avroType = avroSchema.getType
(sqlType, avroType) match {
// Avro strings are in Utf8, so we have to call toString on them
case (StringType, STRING) | (StringType, ENUM) =>
(item: AnyRef) => if (item == null) null else item.toString
// Byte arrays are reused by avro, so we have to make a copy of them.
case (IntegerType, INT) | (BooleanType, BOOLEAN) | (DoubleType, DOUBLE) |
(FloatType, FLOAT) | (LongType, LONG) =>
identity
case (BinaryType, FIXED) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
item.asInstanceOf[Fixed].bytes().clone()
}
case (BinaryType, BYTES) =>
(item: AnyRef) =>
if (item == null) {
null
} else {
val byteBuffer = item.asInstanceOf[ByteBuffer]
val bytes = new Array[Byte](byteBuffer.remaining)
byteBuffer.get(bytes)
bytes
}
case (struct: StructType, RECORD) =>
val length = struct.fields.length
val converters = new Array[AnyRef => AnyRef](length)
val avroFieldIndexes = new Array[Int](length)
var i = 0
while (i < length) {
val sqlField = struct.fields(i)
val avroField = avroSchema.getField(sqlField.name)
if (avroField != null) {
val converter = createConverter(avroField.schema(), sqlField.dataType,
path :+ sqlField.name)
converters(i) = converter
avroFieldIndexes(i) = avroField.pos()
} else if (!sqlField.nullable) {
throw new IncompatibleSchemaException(
s"Cannot find non-nullable field ${sqlField.name} at path ${path.mkString(".")} " +
"in Avro schema\n" +
s"Source Avro schema: $sourceAvroSchema.\n" +
s"Target Catalyst type: $targetSqlType")
}
i += 1
}
(item: AnyRef) => {
if (item == null) {
null
} else {
val record = item.asInstanceOf[GenericRecord]
val result = new Array[Any](length)
var i = 0
while (i < converters.length) {
if (converters(i) != null) {
val converter = converters(i)
result(i) = converter(record.get(avroFieldIndexes(i)))
}
i += 1
}
new GenericRow(result)
}
}
case (arrayType: ArrayType, ARRAY) =>
val elementConverter = createConverter(avroSchema.getElementType, arrayType.elementType,
path)
val allowsNull = arrayType.containsNull
(item: AnyRef) => {
if (item == null) {
null
} else {
item.asInstanceOf[java.lang.Iterable[AnyRef]].asScala.map { element =>
if (element == null && !allowsNull) {
throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " +
"allowed to be null")
} else {
elementConverter(element)
}
}
}
}
case (mapType: MapType, MAP) if mapType.keyType == StringType =>
val valueConverter = createConverter(avroSchema.getValueType, mapType.valueType, path)
val allowsNull = mapType.valueContainsNull
(item: AnyRef) => {
if (item == null) {
null
} else {
item.asInstanceOf[java.util.Map[AnyRef, AnyRef]].asScala.map { x =>
if (x._2 == null && !allowsNull) {
throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " +
"allowed to be null")
} else {
(x._1.toString, valueConverter(x._2))
}
}.toMap
}
}
case (sqlType, UNION) =>
if (avroSchema.getTypes.asScala.exists(_.getType == NULL)) {
val remainingUnionTypes = avroSchema.getTypes.asScala.filterNot(_.getType == NULL)
if (remainingUnionTypes.size == 1) {
createConverter(remainingUnionTypes.head, sqlType, path)
} else {
createConverter(Schema.createUnion(remainingUnionTypes.asJava), sqlType, path)
}
} else avroSchema.getTypes.asScala.map(_.getType) match {
case Seq(t1) => createConverter(avroSchema.getTypes.get(0), sqlType, path)
case Seq(a, b) if Set(a, b) == Set(INT, LONG) && sqlType == LongType =>
(item: AnyRef) => {
item match {
case null => null
case l: java.lang.Long => l
case i: java.lang.Integer => new java.lang.Long(i.longValue())
}
}
case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && sqlType == DoubleType =>
(item: AnyRef) => {
item match {
case null => null
case d: java.lang.Double => d
case f: java.lang.Float => new java.lang.Double(f.doubleValue())
}
}
case other =>
sqlType match {
case t: StructType if t.fields.length == avroSchema.getTypes.size =>
val fieldConverters = t.fields.zip(avroSchema.getTypes.asScala).map {
case (field, schema) =>
createConverter(schema, field.dataType, path :+ field.name)
}
(item: AnyRef) => if (item == null) {
null
} else {
val i = GenericData.get().resolveUnion(avroSchema, item)
val converted = new Array[Any](fieldConverters.length)
converted(i) = fieldConverters(i)(item)
new GenericRow(converted)
}
case _ => throw new IncompatibleSchemaException(
s"Cannot convert Avro schema to catalyst type because schema at path " +
s"${path.mkString(".")} is not compatible " +
s"(avroType = $other, sqlType = $sqlType). \n" +
s"Source Avro schema: $sourceAvroSchema.\n" +
s"Target Catalyst type: $targetSqlType")
}
}
case (left, right) =>
throw new IncompatibleSchemaException(
s"Cannot convert Avro schema to catalyst type because schema at path " +
s"${path.mkString(".")} is not compatible (avroType = $left, sqlType = $right). \n" +
s"Source Avro schema: $sourceAvroSchema.\n" +
s"Target Catalyst type: $targetSqlType")
}
}
createConverter(sourceAvroSchema, targetSqlType, List.empty[String])
}
def createConverterToAvro(dataType: DataType,
structName: String,
recordNamespace: String): Any => Any = {

View File

@@ -43,7 +43,7 @@ object DataSourceReadOptions {
val VIEW_TYPE_INCREMENTAL_OPT_VAL = "incremental"
val VIEW_TYPE_REALTIME_OPT_VAL = "realtime"
val DEFAULT_VIEW_TYPE_OPT_VAL = VIEW_TYPE_READ_OPTIMIZED_OPT_VAL
val DEFAULTPUSH_DOWN_FILTERS_OPT_VAL = ""
/**
* Instant time to start incrementally pulling data from. The instanttime here need not
@@ -64,6 +64,13 @@ object DataSourceReadOptions {
*
*/
val END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime"
/**
* For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions,
* filters appearing late in the sequence of transformations cannot be automatically pushed down.
* This option allows setting filters directly on Hoodie Source
*/
val PUSH_DOWN_INCR_FILTERS_OPT_KEY = "hoodie.datasource.read.incr.filters"
}
/**

View File

@@ -64,21 +64,33 @@ class IncrementalRelation(val sqlContext: SQLContext,
throw new HoodieException(s"Specify the begin instant time to pull from using " +
s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
}
val lastInstant = commitTimeline.lastInstant().get()
val commitsToReturn = commitTimeline.findInstantsInRange(
optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,
commitTimeline.lastInstant().get().getTimestamp))
optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp))
.getInstants.iterator().toList
// use schema from a file produced in the latest instant
val latestSchema = {
// use last instant if instant range is empty
val instant = commitsToReturn.lastOption.getOrElse(lastInstant)
val latestMeta = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(commitsToReturn.last).get, classOf[HoodieCommitMetadata])
.fromBytes(commitTimeline.getInstantDetails(instant).get, classOf[HoodieCommitMetadata])
val metaFilePath = latestMeta.getFileIdAndFullPaths(basePath).values().iterator().next()
AvroConversionUtils.convertAvroSchemaToStructType(ParquetUtils.readAvroSchema(
sqlContext.sparkContext.hadoopConfiguration, new Path(metaFilePath)))
}
val filters = {
if (optParams.contains(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY)) {
val filterStr = optParams.get(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS_OPT_KEY).getOrElse("")
filterStr.split(",").filter(!_.isEmpty)
}
Array[String]()
}
override def schema: StructType = latestSchema
override def buildScan(): RDD[Row] = {
@@ -92,12 +104,17 @@ class IncrementalRelation(val sqlContext: SQLContext,
// will filter out all the files incorrectly.
sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class")
val sOpts = optParams.filter(p => !p._1.equalsIgnoreCase("path"))
sqlContext.read.options(sOpts)
.schema(latestSchema) // avoid AnalysisException for empty input
.parquet(fileIdToFullPath.values.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp))
.toDF().rdd
if (fileIdToFullPath.isEmpty) {
sqlContext.sparkContext.emptyRDD[Row]
} else {
log.info("Additional Filters to be applied to incremental source are :" + filters)
filters.foldLeft(sqlContext.read.options(sOpts)
.schema(latestSchema)
.parquet(fileIdToFullPath.values.toList: _*)
.filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp))
.filter(String.format("%s <= '%s'",
HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp)))((e, f) => e.filter(f))
.toDF().rdd
}
}
}

View File

@@ -100,7 +100,6 @@ class DataSourceTest extends AssertionsForJUnit {
.load(basePath + "/*/*/*/*");
assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
// Read Incremental View
// we have 2 commits, try pulling the first commit (which is not the latest)
val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0);