diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 7e2dffbfe..f519c919e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -56,6 +56,8 @@ public class HoodieTableConfig implements Serializable { public static final String HOODIE_TABLE_NAME_PROP_NAME = "hoodie.table.name"; public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type"; public static final String HOODIE_TABLE_VERSION_PROP_NAME = "hoodie.table.version"; + public static final String HOODIE_TABLE_PRECOMBINE_FIELD = "hoodie.table.precombine.field"; + @Deprecated public static final String HOODIE_RO_FILE_FORMAT_PROP_NAME = "hoodie.table.ro.file.format"; @Deprecated @@ -187,6 +189,10 @@ public class HoodieTableConfig implements Serializable { "org.apache.hudi"); } + public String getPreCombineField() { + return props.getProperty(HOODIE_TABLE_PRECOMBINE_FIELD); + } + /** * Read the payload class for HoodieRecords from the table properties. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 2e8857b2f..9c1e6fc73 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -326,40 +326,73 @@ public class HoodieTableMetaClient implements Serializable { * Helper method to initialize a table, with given basePath, tableType, name, archiveFolder, payloadClass and * base file format. */ + public static HoodieTableMetaClient initTableTypeWithBootstrap(Configuration hadoopConf, String basePath, HoodieTableType tableType, + String tableName, String archiveLogFolder, String payloadClassName, + String baseFileFormat, String preCombineField, String bootstrapIndexClass, + String bootstrapBasePath) throws IOException { + return initTableType(hadoopConf, basePath, tableType, tableName, + archiveLogFolder, payloadClassName, null, + baseFileFormat, preCombineField, bootstrapIndexClass, bootstrapBasePath); + } + public static HoodieTableMetaClient initTableTypeWithBootstrap(Configuration hadoopConf, String basePath, HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName, String baseFileFormat, String bootstrapIndexClass, String bootstrapBasePath) throws IOException { return initTableType(hadoopConf, basePath, tableType, tableName, - archiveLogFolder, payloadClassName, null, baseFileFormat, bootstrapIndexClass, bootstrapBasePath); + archiveLogFolder, payloadClassName, null, + baseFileFormat, null, bootstrapIndexClass, bootstrapBasePath); + } + + public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, + String tableName, String archiveLogFolder, String payloadClassName, + String baseFileFormat, String preCombineField) throws IOException { + return initTableType(hadoopConf, basePath, tableType, tableName, + archiveLogFolder, payloadClassName, null, baseFileFormat, preCombineField, + null, null); } public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName, String baseFileFormat) throws IOException { return initTableType(hadoopConf, basePath, tableType, tableName, - archiveLogFolder, payloadClassName, null, baseFileFormat, null, null); + archiveLogFolder, payloadClassName, null, baseFileFormat, null, + null, null); } /** * Used primarily by tests, examples. */ + public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, + String tableName, String payloadClassName, String preCombineField) throws IOException { + return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, + null, preCombineField); + } + public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, String tableName, String payloadClassName) throws IOException { return initTableType(hadoopConf, basePath, tableType, tableName, null, payloadClassName, - null, null, null, null); + null, (String) null); + } + + public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, + String tableName, String archiveLogFolder, String payloadClassName, + String preCombineField, Integer timelineLayoutVersion) throws IOException { + return initTableType(hadoopConf, basePath, tableType, tableName, archiveLogFolder, payloadClassName, + timelineLayoutVersion, null, preCombineField, null, null); } public static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName, Integer timelineLayoutVersion) throws IOException { return initTableType(hadoopConf, basePath, tableType, tableName, archiveLogFolder, payloadClassName, - timelineLayoutVersion, null, null, null); + timelineLayoutVersion, null, null, null, null); } private static HoodieTableMetaClient initTableType(Configuration hadoopConf, String basePath, HoodieTableType tableType, String tableName, String archiveLogFolder, String payloadClassName, - Integer timelineLayoutVersion, String baseFileFormat, + Integer timelineLayoutVersion, + String baseFileFormat, String preCombineField, String bootstrapIndexClass, String bootstrapBasePath) throws IOException { Properties properties = new Properties(); properties.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_PROP_NAME, tableName); @@ -389,6 +422,9 @@ public class HoodieTableMetaClient implements Serializable { properties.put(HoodieTableConfig.HOODIE_BOOTSTRAP_BASE_PATH, bootstrapBasePath); } + if (null != preCombineField) { + properties.put(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD, preCombineField); + } return HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath, properties); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index f0974977e..5151fe93d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -64,6 +64,8 @@ object DataSourceReadOptions { val READ_PATHS_OPT_KEY = "hoodie.datasource.read.paths" + val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP + @Deprecated val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type" @Deprecated diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index e20c33c28..d204d2d19 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -26,6 +26,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.hadoop.conf.Configuration +import org.apache.hudi.common.model.HoodiePayloadProps import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer} @@ -34,6 +35,8 @@ import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafePro import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.vectorized.ColumnarBatch +import java.util.Properties + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Try @@ -48,7 +51,14 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, extends RDD[InternalRow](sc, Nil) { private val confBroadcast = sc.broadcast(new SerializableWritable(config)) - + private val preCombineField = tableState.preCombineField + private val payloadProps = if (preCombineField.isDefined) { + val properties = new Properties() + properties.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP, preCombineField.get) + Some(properties) + } else { + None + } override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] mergeOnReadPartition.split match { @@ -285,7 +295,12 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private def mergeRowWithLog(curRow: InternalRow, curKey: String) = { val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord] - logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema) + if (payloadProps.isDefined) { + logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, + tableAvroSchema, payloadProps.get) + } else { + logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema) + } } } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 2b3a34989..ad185cb17 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -113,7 +113,7 @@ private[hudi] object HoodieSparkSqlWriter { HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") val tableMetaClient = HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, tableType, tblName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY), - null.asInstanceOf[String]) + null.asInstanceOf[String], parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) tableConfig = tableMetaClient.getTableConfig } @@ -263,7 +263,8 @@ private[hudi] object HoodieSparkSqlWriter { HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path, HoodieTableType.valueOf(tableType), tableName, archiveLogFolder, parameters(PAYLOAD_CLASS_OPT_KEY), - null, bootstrapIndexClass, bootstrapBasePath) + null, parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null), + bootstrapIndexClass, bootstrapBasePath) } val jsc = new JavaSparkContext(sqlContext.sparkContext) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index c85b97291..13766daaf 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -24,7 +24,6 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes - import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path} import org.apache.hadoop.mapred.JobConf import org.apache.log4j.LogManager @@ -78,7 +77,16 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) private val fileIndex = buildFileIndex() - + private val preCombineField = { + val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField + if (preCombineFieldFromTableConfig != null) { + Some(preCombineFieldFromTableConfig) + } else { + // get preCombineFiled from the options if this is a old table which have not store + // the field to hoodie.properties + optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD) + } + } override def schema: StructType = tableStructSchema override def needConversion: Boolean = false @@ -117,7 +125,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, requiredStructSchema, tableAvroSchema.toString, requiredAvroSchema.toString, - fileIndex + fileIndex, + preCombineField ) val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( sparkSession = sqlContext.sparkSession, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 328e3c3cf..50e2ec5f3 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -24,10 +24,8 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes - -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -50,7 +48,8 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType, requiredStructSchema: StructType, tableAvroSchema: String, requiredAvroSchema: String, - hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit]) + hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit], + preCombineField: Option[String]) class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val optParams: Map[String, String], @@ -70,7 +69,16 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL) private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) private val fileIndex = buildFileIndex() - + private val preCombineField = { + val preCombineFieldFromTableConfig = metaClient.getTableConfig.getPreCombineField + if (preCombineFieldFromTableConfig != null) { + Some(preCombineFieldFromTableConfig) + } else { + // get preCombineFiled from the options if this is a old table which have not store + // the field to hoodie.properties + optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD) + } + } override def schema: StructType = tableStructSchema override def needConversion: Boolean = false @@ -92,7 +100,8 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, requiredStructSchema, tableAvroSchema.toString, requiredAvroSchema.toString, - fileIndex + fileIndex, + preCombineField ) val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( sparkSession = sqlContext.sparkSession, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 1ea6ceb87..92024a3c0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -17,11 +17,14 @@ package org.apache.hudi.functional +import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY} import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.DefaultHoodieRecordPayload import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} +import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.log4j.LogManager import org.apache.spark.sql._ @@ -29,7 +32,6 @@ import org.apache.spark.sql.functions._ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} - import scala.collection.JavaConversions._ /** @@ -502,6 +504,44 @@ class TestMORDataSource extends HoodieClientTestBase { hudiSnapshotDF2.show(1) } + @Test + def testPreCombineFiledForReadMOR(): Unit = { + writeData((1, "a0",10, 100)) + checkAnswer((1, "a0",10, 100)) + + writeData((1, "a0", 12, 99)) + // The value has not update, because the version 99 < 100 + checkAnswer((1, "a0",10, 100)) + + writeData((1, "a0", 12, 101)) + // The value has update + checkAnswer((1, "a0", 12, 101)) + } + + private def writeData(data: (Int, String, Int, Int)): Unit = { + val _spark = spark + import _spark.implicits._ + val df = Seq(data).toDF("id", "name", "value", "version") + df.write.format("org.apache.hudi") + .options(commonOpts) + // use DefaultHoodieRecordPayload here + .option(PAYLOAD_CLASS_OPT_KEY, classOf[DefaultHoodieRecordPayload].getCanonicalName) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(RECORDKEY_FIELD_OPT_KEY, "id") + .option(PRECOMBINE_FIELD_OPT_KEY, "version") + .option(PARTITIONPATH_FIELD_OPT_KEY, "") + .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[NonpartitionedKeyGenerator].getName) + .mode(SaveMode.Append) + .save(basePath) + } + + private def checkAnswer(expect: (Int, String, Int, Int)): Unit = { + val readDf = spark.read.format("org.apache.hudi") + .load(basePath + "/*") + val row1 = readDf.select("id", "name", "value", "version").take(1)(0) + assertEquals(Row(expect.productIterator.toSeq: _*), row1) + } + def verifySchemaAndTypes(df: DataFrame): Unit = { assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", df.select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno")