1
0

[HUDI-1550] Honor ordering field for MOR Spark datasource reader (#2497)

This commit is contained in:
pengzhiwei
2021-02-01 21:04:27 +08:00
committed by GitHub
parent f159c0c49a
commit 0d8a4d0a56
8 changed files with 138 additions and 20 deletions

View File

@@ -26,6 +26,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.common.model.HoodiePayloadProps
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer}
@@ -34,6 +35,8 @@ import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafePro
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.vectorized.ColumnarBatch
import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try
@@ -48,7 +51,14 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
extends RDD[InternalRow](sc, Nil) {
private val confBroadcast = sc.broadcast(new SerializableWritable(config))
private val preCombineField = tableState.preCombineField
private val payloadProps = if (preCombineField.isDefined) {
val properties = new Properties()
properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, preCombineField.get)
Some(properties)
} else {
None
}
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
mergeOnReadPartition.split match {
@@ -285,7 +295,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
if (payloadProps.isDefined) {
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord,
tableAvroSchema, payloadProps.get)
} else {
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
}
}
}
}

View File

@@ -113,7 +113,7 @@ private[hudi] object HoodieSparkSqlWriter {
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get,
tableType, tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
null.asInstanceOf[String])
null.asInstanceOf[String], parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
tableConfig = tableMetaClient.getTableConfig
}
@@ -263,7 +263,8 @@ private[hudi] object HoodieSparkSqlWriter {
HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path,
HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY),
null, bootstrapIndexClass, bootstrapBasePath)
null, parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null),
bootstrapIndexClass, bootstrapBasePath)
}
val jsc = new JavaSparkContext(sqlContext.sparkContext)

View File

@@ -24,7 +24,6 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.LogManager
@@ -78,7 +77,16 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
private val fileIndex = buildFileIndex()
private val preCombineField = {
val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
if (preCombineFieldFromTableConfig != null) {
Some(preCombineFieldFromTableConfig)
} else {
// get preCombineFiled from the options if this is a old table which have not store
// the field to hoodie.properties
optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD)
}
}
override def schema: StructType = tableStructSchema
override def needConversion: Boolean = false
@@ -117,7 +125,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
requiredStructSchema,
tableAvroSchema.toString,
requiredAvroSchema.toString,
fileIndex
fileIndex,
preCombineField
)
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,

View File

@@ -24,10 +24,8 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -50,7 +48,8 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType,
requiredStructSchema: StructType,
tableAvroSchema: String,
requiredAvroSchema: String,
hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit])
hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit],
preCombineField: Option[String])
class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
val optParams: Map[String, String],
@@ -70,7 +69,16 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
private val fileIndex = buildFileIndex()
private val preCombineField = {
val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField
if (preCombineFieldFromTableConfig != null) {
Some(preCombineFieldFromTableConfig)
} else {
// get preCombineFiled from the options if this is a old table which have not store
// the field to hoodie.properties
optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD)
}
}
override def schema: StructType = tableStructSchema
override def needConversion: Boolean = false
@@ -92,7 +100,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
requiredStructSchema,
tableAvroSchema.toString,
requiredAvroSchema.toString,
fileIndex
fileIndex,
preCombineField
)
val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
sparkSession = sqlContext.sparkSession,

View File

@@ -17,11 +17,14 @@
package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.DefaultHoodieRecordPayload
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.log4j.LogManager
import org.apache.spark.sql._
@@ -29,7 +32,6 @@ import org.apache.spark.sql.functions._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.collection.JavaConversions._
/**
@@ -502,6 +504,44 @@ class TestMORDataSource extends HoodieClientTestBase {
hudiSnapshotDF2.show(1)
}
@Test
def testPreCombineFiledForReadMOR(): Unit = {
writeData((1, "a0",10, 100))
checkAnswer((1, "a0",10, 100))
writeData((1, "a0", 12, 99))
// The value has not update, because the version 99 < 100
checkAnswer((1, "a0",10, 100))
writeData((1, "a0", 12, 101))
// The value has update
checkAnswer((1, "a0", 12, 101))
}
private def writeData(data: (Int, String, Int, Int)): Unit = {
val _spark = spark
import _spark.implicits._
val df = Seq(data).toDF("id", "name", "value", "version")
df.write.format("org.apache.hudi")
.options(commonOpts)
// use DefaultHoodieRecordPayload here
.option(PAYLOAD_CLASS_OPT_KEY, classOf[DefaultHoodieRecordPayload].getCanonicalName)
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(RECORDKEY_FIELD_OPT_KEY, "id")
.option(PRECOMBINE_FIELD_OPT_KEY, "version")
.option(PARTITIONPATH_FIELD_OPT_KEY, "")
.option(KEYGENERATOR_CLASS_OPT_KEY, classOf[NonpartitionedKeyGenerator].getName)
.mode(SaveMode.Append)
.save(basePath)
}
private def checkAnswer(expect: (Int, String, Int, Int)): Unit = {
val readDf = spark.read.format("org.apache.hudi")
.load(basePath + "/*")
val row1 = readDf.select("id", "name", "value", "version").take(1)(0)
assertEquals(Row(expect.productIterator.toSeq: _*), row1)
}
def verifySchemaAndTypes(df: DataFrame): Unit = {
assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
df.select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno")