[HUDI-3936] Fix projection for a nested field as pre-combined key (#5379)
This PR fixes the projection logic around a nested field which is used as the pre-combined key field. The fix is to only check and append the root level field for projection, i.e., "a", for a nested field "a.b.c" in the mandatory columns. - Changes the logic to check and append the root level field for a required nested field in the mandatory columns in HoodieBaseRelation.appendMandatoryColumns
This commit is contained in:
@@ -18,6 +18,17 @@
|
|||||||
|
|
||||||
package org.apache.hudi.avro;
|
package org.apache.hudi.avro;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.config.SerializableSchema;
|
||||||
|
import org.apache.hudi.common.model.HoodieOperation;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
import org.apache.hudi.common.model.HoodieRecordPayload;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
|
import org.apache.hudi.common.util.collection.Pair;
|
||||||
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
import org.apache.hudi.exception.SchemaCompatibilityException;
|
||||||
|
|
||||||
import org.apache.avro.AvroRuntimeException;
|
import org.apache.avro.AvroRuntimeException;
|
||||||
import org.apache.avro.Conversions;
|
import org.apache.avro.Conversions;
|
||||||
import org.apache.avro.Conversions.DecimalConversion;
|
import org.apache.avro.Conversions.DecimalConversion;
|
||||||
@@ -42,16 +53,6 @@ import org.apache.avro.io.EncoderFactory;
|
|||||||
import org.apache.avro.io.JsonDecoder;
|
import org.apache.avro.io.JsonDecoder;
|
||||||
import org.apache.avro.io.JsonEncoder;
|
import org.apache.avro.io.JsonEncoder;
|
||||||
import org.apache.avro.specific.SpecificRecordBase;
|
import org.apache.avro.specific.SpecificRecordBase;
|
||||||
import org.apache.hudi.common.config.SerializableSchema;
|
|
||||||
import org.apache.hudi.common.model.HoodieOperation;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
|
||||||
import org.apache.hudi.common.model.HoodieRecordPayload;
|
|
||||||
import org.apache.hudi.common.util.Option;
|
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
|
||||||
import org.apache.hudi.common.util.collection.Pair;
|
|
||||||
import org.apache.hudi.exception.HoodieException;
|
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
|
||||||
import org.apache.hudi.exception.SchemaCompatibilityException;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
@@ -480,6 +481,17 @@ public class HoodieAvroUtils {
|
|||||||
return projectedSchema;
|
return projectedSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain the root-level field name of a full field name, possibly a nested field.
|
||||||
|
* For example, given "a.b.c", the output is "a"; given "a", the output is "a".
|
||||||
|
*
|
||||||
|
* @param fieldName The field name.
|
||||||
|
* @return Root-level field name
|
||||||
|
*/
|
||||||
|
public static String getRootLevelFieldName(String fieldName) {
|
||||||
|
return fieldName.split("\\.")[0];
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
|
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -257,6 +257,13 @@ public class TestHoodieAvroUtils {
|
|||||||
assertEquals(expectedSchema, rec1.getSchema());
|
assertEquals(expectedSchema, rec1.getSchema());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetRootLevelFieldName() {
|
||||||
|
assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a.b.c"));
|
||||||
|
assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a"));
|
||||||
|
assertEquals("", HoodieAvroUtils.getRootLevelFieldName(""));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetNestedFieldVal() {
|
public void testGetNestedFieldVal() {
|
||||||
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ import org.apache.hudi.hadoop.HoodieROTablePathFilter
|
|||||||
import org.apache.spark.sql.SQLContext
|
import org.apache.spark.sql.SQLContext
|
||||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||||
import org.apache.spark.sql.execution.datasources._
|
import org.apache.spark.sql.execution.datasources._
|
||||||
import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
|
import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
|
||||||
import org.apache.spark.sql.hive.orc.OrcFileFormat
|
import org.apache.spark.sql.hive.orc.OrcFileFormat
|
||||||
import org.apache.spark.sql.sources.{BaseRelation, Filter}
|
import org.apache.spark.sql.sources.{BaseRelation, Filter}
|
||||||
import org.apache.spark.sql.types.StructType
|
import org.apache.spark.sql.types.StructType
|
||||||
@@ -54,7 +54,7 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,
|
|||||||
|
|
||||||
override type FileSplit = HoodieBaseFileSplit
|
override type FileSplit = HoodieBaseFileSplit
|
||||||
|
|
||||||
override lazy val mandatoryColumns: Seq[String] =
|
override lazy val mandatoryFields: Seq[String] =
|
||||||
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
|
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
|
||||||
Seq(recordKeyField)
|
Seq(recordKeyField)
|
||||||
|
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig
|
|||||||
import org.apache.hadoop.mapred.JobConf
|
import org.apache.hadoop.mapred.JobConf
|
||||||
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
|
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
|
||||||
import org.apache.hudi.HoodieConversionUtils.toScalaOption
|
import org.apache.hudi.HoodieConversionUtils.toScalaOption
|
||||||
|
import org.apache.hudi.avro.HoodieAvroUtils
|
||||||
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
|
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
|
||||||
import org.apache.hudi.common.fs.FSUtils
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
|
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
|
||||||
@@ -39,10 +40,8 @@ import org.apache.hudi.io.storage.HoodieHFileReader
|
|||||||
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
|
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
|
|
||||||
import org.apache.spark.sql.catalyst.InternalRow
|
import org.apache.spark.sql.catalyst.InternalRow
|
||||||
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
|
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
|
||||||
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
|
|
||||||
import org.apache.spark.sql.execution.FileRelation
|
import org.apache.spark.sql.execution.FileRelation
|
||||||
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
|
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
|
||||||
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
|
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
|
||||||
@@ -199,7 +198,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
|||||||
*
|
*
|
||||||
* @VisibleInTests
|
* @VisibleInTests
|
||||||
*/
|
*/
|
||||||
val mandatoryColumns: Seq[String]
|
val mandatoryFields: Seq[String]
|
||||||
|
|
||||||
|
protected def mandatoryRootFields: Seq[String] =
|
||||||
|
mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
|
||||||
|
|
||||||
protected def timeline: HoodieTimeline =
|
protected def timeline: HoodieTimeline =
|
||||||
// NOTE: We're including compaction here since it's not considering a "commit" operation
|
// NOTE: We're including compaction here since it's not considering a "commit" operation
|
||||||
@@ -246,7 +248,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
|||||||
//
|
//
|
||||||
// (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
|
// (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
|
||||||
// PROJECTION
|
// PROJECTION
|
||||||
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
|
val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns)
|
||||||
|
|
||||||
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
|
val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
|
||||||
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema)
|
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema)
|
||||||
@@ -362,8 +364,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
|
|||||||
!SubqueryExpression.hasSubquery(condition)
|
!SubqueryExpression.hasSubquery(condition)
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
|
protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = {
|
||||||
val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
|
// For a nested field in mandatory columns, we should first get the root-level field, and then
|
||||||
|
// check for any missing column, as the requestedColumns should only contain root-level fields
|
||||||
|
// We should only append root-level field as well
|
||||||
|
val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField))
|
||||||
requestedColumns ++ missing
|
requestedColumns ++ missing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -153,7 +153,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
|
|||||||
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
|
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
override lazy val mandatoryColumns: Seq[String] = {
|
override lazy val mandatoryFields: Seq[String] = {
|
||||||
// NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in
|
// NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in
|
||||||
// cases when no columns are requested to be fetched (for ex, when using {@code count()} API)
|
// cases when no columns are requested to be fetched (for ex, when using {@code count()} API)
|
||||||
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
|
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
|
|||||||
|
|
||||||
override type FileSplit = HoodieMergeOnReadFileSplit
|
override type FileSplit = HoodieMergeOnReadFileSplit
|
||||||
|
|
||||||
override lazy val mandatoryColumns: Seq[String] =
|
override lazy val mandatoryFields: Seq[String] =
|
||||||
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
|
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
|
||||||
|
|
||||||
protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
|
protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
|
|||||||
import org.apache.hudi.common.fs.FSUtils
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
|
||||||
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
|
||||||
|
import org.apache.hudi.common.util.StringUtils
|
||||||
import org.apache.hudi.config.HoodieWriteConfig
|
import org.apache.hudi.config.HoodieWriteConfig
|
||||||
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
|
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
|
||||||
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
|
||||||
@@ -32,7 +33,7 @@ import org.apache.spark.sql.functions.{col, lit}
|
|||||||
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
|
||||||
import org.junit.jupiter.api.Tag
|
import org.junit.jupiter.api.Tag
|
||||||
import org.junit.jupiter.params.ParameterizedTest
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
import org.junit.jupiter.params.provider.ValueSource
|
import org.junit.jupiter.params.provider.CsvSource
|
||||||
|
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
|
|
||||||
@@ -57,19 +58,28 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
val updatedVerificationVal: String = "driver_update"
|
val updatedVerificationVal: String = "driver_update"
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ValueSource(booleans = Array(true, false))
|
@CsvSource(Array(
|
||||||
def testMergeOnReadStorage(isMetadataEnabled: Boolean) {
|
"true,",
|
||||||
val dataGen = new HoodieTestDataGenerator()
|
"true,fare.currency",
|
||||||
|
"false,",
|
||||||
|
"false,fare.currency"
|
||||||
|
))
|
||||||
|
def testMergeOnReadStorage(isMetadataEnabled: Boolean, preComineField: String) {
|
||||||
|
var options: Map[String, String] = commonOpts +
|
||||||
|
(HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled))
|
||||||
|
if (!StringUtils.isNullOrEmpty(preComineField)) {
|
||||||
|
options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> preComineField)
|
||||||
|
}
|
||||||
|
val dataGen = new HoodieTestDataGenerator(0xDEEF)
|
||||||
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
|
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
|
||||||
// Bulk Insert Operation
|
// Bulk Insert Operation
|
||||||
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
|
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
|
||||||
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
|
||||||
inputDF1.write.format("org.apache.hudi")
|
inputDF1.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(options)
|
||||||
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
|
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
|
||||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
|
||||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
|
||||||
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
|
|
||||||
.mode(SaveMode.Overwrite)
|
.mode(SaveMode.Overwrite)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|
||||||
@@ -90,8 +100,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
|
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
|
||||||
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
|
||||||
inputDF2.write.format("org.apache.hudi")
|
inputDF2.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(options)
|
||||||
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
|
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|
||||||
@@ -110,8 +119,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
|
|||||||
val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
|
val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
|
||||||
|
|
||||||
inputDF3.write.format("org.apache.hudi")
|
inputDF3.write.format("org.apache.hudi")
|
||||||
.options(commonOpts)
|
.options(options)
|
||||||
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
|
|
||||||
.mode(SaveMode.Append)
|
.mode(SaveMode.Append)
|
||||||
.save(basePath)
|
.save(basePath)
|
||||||
|
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
|
|||||||
|
|
||||||
import org.apache.avro.Schema
|
import org.apache.avro.Schema
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
import org.apache.hudi.common.config.HoodieMetadataConfig
|
||||||
import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload}
|
import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload}
|
||||||
import org.apache.hudi.common.table.HoodieTableConfig
|
import org.apache.hudi.common.table.HoodieTableConfig
|
||||||
import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator}
|
import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator}
|
||||||
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
|
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
|
||||||
@@ -332,7 +332,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
|
|||||||
logWarning(s"Not matching bytes read ($bytesRead)")
|
logWarning(s"Not matching bytes read ($bytesRead)")
|
||||||
}
|
}
|
||||||
|
|
||||||
val readColumns = targetColumns ++ relation.mandatoryColumns
|
val readColumns = targetColumns ++ relation.mandatoryFields
|
||||||
val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
|
val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)
|
||||||
|
|
||||||
val row: InternalRow = rows.take(1).head
|
val row: InternalRow = rows.take(1).head
|
||||||
|
|||||||
Reference in New Issue
Block a user