1
0

[HUDI-3215] Solve UT for Spark 3.2 (#4565)

This commit is contained in:
Yann Byron
2022-01-27 06:48:26 +08:00
committed by GitHub
parent 3f21e5f14c
commit 4a9f826382
5 changed files with 78 additions and 37 deletions

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
@@ -30,9 +31,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro;
import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldVal;
/**
* {@link HoodieRecordPayload} impl that honors ordering field in both preCombine and combineAndGetUpdateValue.
* <p>
@@ -57,7 +55,7 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
return Option.empty();
}
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
// Null check is needed here to support schema evolution. The record in storage may be from old schema where
// the new ordering column might not be present and hence returns null.
@@ -81,7 +79,7 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
if (recordBytes.length == 0) {
return Option.empty();
}
GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
eventTime = updateEventTime(incomingRecord, properties);
return isDeleteRecord(incomingRecord) ? Option.empty() : Option.of(incomingRecord);
@@ -123,4 +121,24 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY), true, consistentLogicalTimestampEnabled);
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
}
/**
* a wrapper of HoodieAvroUtils.getNestedFieldVal.
* Within it, catch exceptions and return null when "returnNullIfNotFound" is true and can't take effect.
*/
private static Object getNestedFieldVal(
GenericRecord record,
String fieldName,
boolean returnNullIfNotFound,
boolean consistentLogicalTimestampEnabled) {
try {
return HoodieAvroUtils.getNestedFieldVal(record, fieldName, returnNullIfNotFound, consistentLogicalTimestampEnabled);
} catch (Exception e) {
if (returnNullIfNotFound) {
return null;
} else {
throw e;
}
}
}
}

View File

@@ -28,7 +28,7 @@ import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.{HoodieAvroSerializer, HoodieAvroDeserializer}
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -36,6 +36,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
import java.io.Closeable
import java.util.Properties
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -54,15 +55,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
private val preCombineField = tableState.preCombineField
private val recordKeyFieldOpt = tableState.recordKeyFieldOpt
private val payloadProps = if (preCombineField.isDefined) {
Some(HoodiePayloadConfig.newBuilder.withPayloadOrderingField(preCombineField.get).build.getProps)
HoodiePayloadConfig.newBuilder
.withPayloadOrderingField(preCombineField.get)
.build.getProps
} else {
None
new Properties()
}
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
val iter = mergeOnReadPartition.split match {
case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
val rows = read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
extractRequiredSchema(rows)
case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
logFileIterator(logFileOnlySplit, getConfig)
case skipMergeSplit if skipMergeSplit.mergeType
@@ -118,6 +122,18 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
rows
}
private def extractRequiredSchema(iter: Iterator[InternalRow]): Iterator[InternalRow] = {
val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
val requiredFieldPosition = tableState.requiredStructSchema
.map(f => tableAvroSchema.getField(f.name).pos()).toList
val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
val rows = iter.map { row =>
unsafeProjection(createRowWithRequiredSchema(row, requiredFieldPosition))
}
rows
}
private def logFileIterator(split: HoodieMergeOnReadFileSplit,
config: Configuration): Iterator[InternalRow] =
new Iterator[InternalRow] with Closeable {
@@ -188,7 +204,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@scala.annotation.tailrec
override def hasNext: Boolean = {
if (baseFileIterator.hasNext) {
recordToLoad = baseFileIterator.next()
val curRow = baseFileIterator.next()
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition))
true
} else {
if (logRecordsKeyIterator.hasNext) {
@@ -272,7 +289,7 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
} else {
// No merge needed, load current row with required schema
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow))
recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow, requiredFieldPosition))
true
}
} else {
@@ -317,32 +334,29 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
}
}
private def createRowWithRequiredSchema(row: InternalRow): InternalRow = {
val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
val posIterator = requiredFieldPosition.iterator
var curIndex = 0
tableState.requiredStructSchema.foreach(
f => {
val curPos = posIterator.next()
val curField = if (row.isNullAt(curPos)) null else row.get(curPos, f.dataType)
rowToReturn.update(curIndex, curField)
curIndex = curIndex + 1
}
)
rowToReturn
}
private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
if (payloadProps.isDefined) {
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord,
tableAvroSchema, payloadProps.get)
} else {
logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
}
logRecords.get(curKey).getData.combineAndGetUpdateValue(
historyAvroRecord, tableAvroSchema, payloadProps)
}
}
}
private def createRowWithRequiredSchema(row: InternalRow, requiredFieldPosition: Seq[Int]): InternalRow = {
val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
val posIterator = requiredFieldPosition.iterator
var curIndex = 0
tableState.requiredStructSchema.foreach(
f => {
val curPos = posIterator.next()
val curField = if (row.isNullAt(curPos)) null else row.get(curPos, f.dataType)
rowToReturn.update(curIndex, curField)
curIndex = curIndex + 1
}
)
rowToReturn
}
}
private object HoodieMergeOnReadRDD {
val CONFIG_INSTANTIATION_LOCK = new Object()

View File

@@ -19,16 +19,18 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
import org.apache.hudi.common.config.{HoodieConfig, HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType, HoodieTimelineTimeZone, WriteOperationType}
import org.apache.hudi.common.model._
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils, StringUtils}
@@ -42,12 +44,14 @@ import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.sync.common.AbstractSyncTool
import org.apache.hudi.table.BulkInsertPartitioner
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql._
import org.apache.spark.SparkContext
import java.util.Properties
@@ -730,6 +734,11 @@ object HoodieSparkSqlWriter {
mergedParams(key) = value
}
}
// use preCombineField to fill in PAYLOAD_ORDERING_FIELD_PROP_KEY
if (mergedParams.contains(PRECOMBINE_FIELD.key())) {
mergedParams.put(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, mergedParams(PRECOMBINE_FIELD.key()))
}
val params = mergedParams.toMap
(params, HoodieWriterUtils.convertMapToHoodieConfig(params))
}

View File

@@ -145,7 +145,7 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = requiredStructSchema,
requiredSchema = tableStructSchema,
filters = pushDownFilter,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()

View File

@@ -131,7 +131,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
sparkSession = sqlContext.sparkSession,
dataSchema = tableStructSchema,
partitionSchema = StructType(Nil),
requiredSchema = requiredStructSchema,
requiredSchema = tableStructSchema,
filters = filters,
options = optParams,
hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()