From 034addaef5834eff09cfd9ac5cc2656df95ca0e8 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 9 Mar 2022 18:45:25 -0800 Subject: [PATCH] [HUDI-3396] Make sure `BaseFileOnlyViewRelation` only reads projected columns (#4818) NOTE: This change is first part of the series to clean up Hudi's Spark DataSource related implementations, making sure there's minimal code duplication among them, implementations are consistent and performant This PR is making sure that BaseFileOnlyViewRelation only reads projected columns as well as avoiding unnecessary serde from Row to InternalRow Brief change log - Introduced HoodieBaseRDD as a base for all custom RDD impls - Extracted common fields/methods to HoodieBaseRelation - Cleaned up and streamlined HoodieBaseFileViewOnlyRelation - Fixed all of the Relations to avoid superfluous Row <> InternalRow conversions --- .../apache/hudi/config/HoodieWriteConfig.java | 4 +- .../org/apache/hudi/HoodieSparkUtils.scala | 18 +- ...ait.scala => HoodieAvroDeserializer.scala} | 10 +- ...Trait.scala => HoodieAvroSerializer.scala} | 2 +- .../apache/spark/sql/hudi/SparkAdapter.scala | 10 +- .../SparkClientFunctionalTestHarness.java | 32 +- .../hudi/metadata/HoodieMetadataPayload.java | 2 +- .../hudi/BaseFileOnlyViewRelation.scala | 133 ++++--- .../org/apache/hudi/HoodieBaseRelation.scala | 78 +++- .../apache/hudi/HoodieDataSourceHelper.scala | 39 +- .../org/apache/hudi/HoodieFileScanRDD.scala | 60 +-- .../apache/hudi/HoodieMergeOnReadRDD.scala | 16 +- .../org/apache/hudi/HoodieUnsafeRDD.scala | 68 ++++ .../hudi/MergeOnReadIncrementalRelation.scala | 29 +- .../hudi/MergeOnReadSnapshotRelation.scala | 53 +-- .../apache/spark/HoodieUnsafeRDDUtils.scala | 44 +++ ....scala => HoodieSparkAvroSerializer.scala} | 4 +- ...estConvertFilterToCatalystExpression.scala | 8 +- .../functional/TestMORDataSourceStorage.scala | 3 + .../TestParquetColumnProjection.scala | 355 ++++++++++++++++++ .../spark/sql/adapter/Spark2Adapter.scala | 10 +- ...ala => HoodieSpark2AvroDeserializer.scala} | 10 +- .../spark/sql/adapter/Spark3Adapter.scala | 10 +- ...ala => HoodieSpark3AvroDeserializer.scala} | 6 +- .../functional/TestHDFSParquetImporter.java | 11 +- 25 files changed, 751 insertions(+), 264 deletions(-) rename hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/{HoodieAvroDeserializerTrait.scala => HoodieAvroDeserializer.scala} (75%) rename hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/{HoodieAvroSerializerTrait.scala => HoodieAvroSerializer.scala} (97%) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala rename hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/{HoodieAvroSerializer.scala => HoodieSparkAvroSerializer.scala} (89%) create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala rename hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/{Spark2HoodieAvroDeserializer.scala => HoodieSpark2AvroDeserializer.scala} (76%) rename hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/{Spark3HoodieAvroDeserializer.scala => HoodieSpark3AvroDeserializer.scala} (89%) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 6053dcf3b..4202cbda7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; @@ -1552,7 +1553,8 @@ public class HoodieWriteConfig extends HoodieConfig { } public CompressionCodecName getParquetCompressionCodec() { - return CompressionCodecName.fromConf(getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME)); + String codecName = getString(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); + return CompressionCodecName.fromConf(StringUtils.isNullOrEmpty(codecName) ? null : codecName); } public boolean parquetDictionaryEnabled() { diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index b288289ac..c96380641 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -177,14 +177,24 @@ object HoodieSparkUtils extends SparkAdapterSupport { * Convert Filters to Catalyst Expressions and joined by And. If convert success return an * Non-Empty Option[Expression],or else return None. */ - def convertToCatalystExpressions(filters: Array[Filter], - tableSchema: StructType): Option[Expression] = { - val expressions = filters.map(convertToCatalystExpression(_, tableSchema)) + def convertToCatalystExpressions(filters: Seq[Filter], + tableSchema: StructType): Seq[Option[Expression]] = { + filters.map(convertToCatalystExpression(_, tableSchema)) + } + + + /** + * Convert Filters to Catalyst Expressions and joined by And. If convert success return an + * Non-Empty Option[Expression],or else return None. + */ + def convertToCatalystExpression(filters: Array[Filter], + tableSchema: StructType): Option[Expression] = { + val expressions = convertToCatalystExpressions(filters, tableSchema) if (expressions.forall(p => p.isDefined)) { if (expressions.isEmpty) { None } else if (expressions.length == 1) { - expressions(0) + expressions.head } else { Some(expressions.map(_.get).reduce(org.apache.spark.sql.catalyst.expressions.And)) } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala similarity index 75% rename from hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala index 5c3035304..4c4ddb5bf 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializerTrait.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroDeserializer.scala @@ -24,12 +24,6 @@ package org.apache.spark.sql.avro * If you're looking to convert Avro into "deserialized" [[Row]] (comprised of Java native types), * please check [[AvroConversionUtils]] */ -trait HoodieAvroDeserializerTrait { - final def deserialize(data: Any): Option[Any] = - doDeserialize(data) match { - case opt: Option[_] => opt // As of Spark 3.1, this will return data wrapped with Option, so we fetch the data - case row => Some(row) // For other Spark versions, return the data as is - } - - protected def doDeserialize(data: Any): Any +trait HoodieAvroDeserializer { + def deserialize(data: Any): Option[Any] } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala similarity index 97% rename from hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala rename to hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala index 159d8da74..84ba44b00 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializerTrait.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala @@ -23,6 +23,6 @@ package org.apache.spark.sql.avro * NOTE: This is low-level component operating on Spark internal data-types (comprising [[InternalRow]]). * If you're looking to convert "deserialized" [[Row]] into Avro, please check [[AvroConversionUtils]] */ -trait HoodieAvroSerializerTrait { +trait HoodieAvroSerializer { def serialize(catalystData: Any): Any } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 62bdc4492..e41a9c1c8 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hudi import org.apache.avro.Schema import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, HoodieAvroSerializerTrait} +import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder @@ -43,16 +43,16 @@ import java.util.Locale trait SparkAdapter extends Serializable { /** - * Creates instance of [[HoodieAvroSerializerTrait]] providing for ability to serialize + * Creates instance of [[HoodieAvroSerializer]] providing for ability to serialize * Spark's [[InternalRow]] into Avro payloads */ - def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializerTrait + def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer /** - * Creates instance of [[HoodieAvroDeserializerTrait]] providing for ability to deserialize + * Creates instance of [[HoodieAvroDeserializer]] providing for ability to deserialize * Avro payloads into Spark's [[InternalRow]] */ - def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializerTrait + def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer /** * Create the SparkRowSerDe. diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index 94e080cae..f9676c6c4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -19,6 +19,13 @@ package org.apache.hudi.testutils; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -28,6 +35,7 @@ import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -42,6 +50,7 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.table.HoodieSparkTable; @@ -50,14 +59,11 @@ import org.apache.hudi.testutils.providers.HoodieMetaClientProvider; import org.apache.hudi.testutils.providers.HoodieWriteClientProvider; import org.apache.hudi.testutils.providers.SparkProvider; import org.apache.hudi.timeline.service.TimelineService; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; @@ -69,6 +75,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE; @@ -348,6 +355,21 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe .withRollbackUsingMarkers(rollbackUsingMarkers); } + protected Dataset toDataset(List records, Schema schema) { + List avroRecords = records.stream() + .map(r -> { + HoodieRecordPayload payload = (HoodieRecordPayload) r.getData(); + try { + return (GenericRecord) payload.getInsertValue(schema).get(); + } catch (IOException e) { + throw new HoodieIOException("Failed to extract Avro payload", e); + } + }) + .collect(Collectors.toList()); + JavaRDD jrdd = jsc.parallelize(avroRecords, 2); + return AvroConversionUtils.createDataFrame(jrdd.rdd(), schema.toString(), spark); + } + protected int incrementTimelineServicePortToUse() { // Increment the timeline service port for each individual test // to avoid port reuse causing failures diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 548fbb95d..c0ad8b147 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -133,7 +133,7 @@ public class HoodieMetadataPayload implements HoodieRecordPayload + *
  • For COW tables: Snapshot
  • + *
  • For MOR tables: Read-optimized
  • + * + * + * NOTE: The reason this Relation is used in liue of Spark's default [[HadoopFsRelation]] is primarily due to the + * fact that it injects real partition's path as the value of the partition field, which Hudi ultimately persists + * as part of the record payload. In some cases, however, partition path might not necessarily be equal to the + * verbatim value of the partition path field (when custom [[KeyGenerator]] is used) therefore leading to incorrect + * partition field values being written */ -class BaseFileOnlyViewRelation( - sqlContext: SQLContext, - metaClient: HoodieTableMetaClient, - optParams: Map[String, String], - userSchema: Option[StructType], - globPaths: Seq[Path] - ) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { +class BaseFileOnlyViewRelation(sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + optParams: Map[String, String], + userSchema: Option[StructType], + globPaths: Seq[Path]) + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) with SparkAdapterSupport { - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") + private val fileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams, + FileStatusCache.getOrCreate(sqlContext.sparkSession)) - val filterExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) - .getOrElse(Literal(true, BooleanType)) - val (partitionFilters, dataFilters) = { - val splited = filters.map { filter => - HoodieDataSourceHelper.splitPartitionAndDataPredicates( - sparkSession, filterExpressions, partitionColumns) - } - (splited.flatMap(_._1), splited.flatMap(_._2)) - } - val partitionFiles = getPartitionFiles(partitionFilters, dataFilters) + override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { + // NOTE: In case list of requested columns doesn't contain the Primary Key one, we + // have to add it explicitly so that + // - Merging could be performed correctly + // - In case 0 columns are to be fetched (for ex, when doing {@code count()} on Spark's [[Dataset]], + // Spark still fetches all the rows to execute the query correctly + // + // It's okay to return columns that have not been requested by the caller, as those nevertheless will be + // filtered out upstream + val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns) - val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes - val filePartitions = sparkAdapter.getFilePartitions(sparkSession, partitionFiles, maxSplitBytes) + val (requiredAvroSchema, requiredStructSchema) = + HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns) - val requiredSchemaParquetReader = HoodieDataSourceHelper.buildHoodieParquetReader( - sparkSession = sparkSession, - dataSchema = tableStructSchema, - partitionSchema = StructType(Nil), - requiredSchema = tableStructSchema, + val filterExpressions = convertToExpressions(filters) + val (partitionFilters, dataFilters) = filterExpressions.partition(isPartitionPredicate) + + val filePartitions = getPartitions(partitionFilters, dataFilters) + + val partitionSchema = StructType(Nil) + val tableSchema = HoodieTableSchema(tableStructSchema, tableAvroSchema.toString) + val requiredSchema = HoodieTableSchema(requiredStructSchema, requiredAvroSchema.toString) + + val baseFileReader = createBaseFileReader( + spark = sparkSession, + partitionSchema = partitionSchema, + tableSchema = tableSchema, + requiredSchema = requiredSchema, filters = filters, options = optParams, - hadoopConf = sparkSession.sessionState.newHadoopConf() + // NOTE: We have to fork the Hadoop Config here as Spark will be modifying it + // to configure Parquet reader appropriately + hadoopConf = new Configuration(conf) ) - new HoodieFileScanRDD(sparkSession, requiredColumns, tableStructSchema, - requiredSchemaParquetReader, filePartitions) + new HoodieFileScanRDD(sparkSession, baseFileReader, filePartitions) } - private def getPartitionFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionedFile] = { + private def getPartitions(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FilePartition] = { val partitionDirectories = if (globPaths.isEmpty) { val hoodieFileIndex = HoodieFileIndex(sparkSession, metaClient, userSchema, optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) @@ -89,18 +108,46 @@ class BaseFileOnlyViewRelation( inMemoryFileIndex.listFiles(partitionFilters, dataFilters) } - val partitionFiles = partitionDirectories.flatMap { partition => + val partitions = partitionDirectories.flatMap { partition => partition.files.flatMap { file => + // TODO move to adapter + // TODO fix, currently assuming parquet as underlying format HoodieDataSourceHelper.splitFiles( sparkSession = sparkSession, file = file, - partitionValues = partition.values + // TODO clarify why this is required + partitionValues = InternalRow.empty ) } } - partitionFiles.map{ f => - PartitionedFile(InternalRow.empty, f.filePath, f.start, f.length) - } + val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + + sparkAdapter.getFilePartitions(sparkSession, partitions, maxSplitBytes) } + + private def convertToExpressions(filters: Array[Filter]): Array[Expression] = { + val catalystExpressions = HoodieSparkUtils.convertToCatalystExpressions(filters, tableStructSchema) + + val failedExprs = catalystExpressions.zipWithIndex.filter { case (opt, _) => opt.isEmpty } + if (failedExprs.nonEmpty) { + val failedFilters = failedExprs.map(p => filters(p._2)) + logWarning(s"Failed to convert Filters into Catalyst expressions (${failedFilters.map(_.toString)})") + } + + catalystExpressions.filter(_.isDefined).map(_.get).toArray + } + + /** + * Checks whether given expression only references only references partition columns + * (and involves no sub-query) + */ + private def isPartitionPredicate(condition: Expression): Boolean = { + // Validates that the provided names both resolve to the same entity + val resolvedNameEquals = sparkSession.sessionState.analyzer.resolver + + condition.references.forall { r => partitionColumns.exists(resolvedNameEquals(r.name, _)) } && + !SubqueryExpression.hasSubquery(condition) + } + } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 1e2946dd2..e07b316d4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -22,38 +22,70 @@ import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.io.hfile.CacheConfig +import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.HoodieBaseRelation.isMetadataTable import org.apache.hudi.common.config.SerializableConfiguration import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.io.storage.HoodieHFileReader -import org.apache.hudi.metadata.HoodieTableMetadata +import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata} import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{SQLContext, SparkSession} +import org.apache.spark.sql.{Row, SQLContext, SparkSession} import scala.collection.JavaConverters._ import scala.util.Try case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String) +case class HoodieTableState(recordKeyField: String, + preCombineFieldOpt: Option[String]) + /** * Hoodie BaseRelation which extends [[PrunedFilteredScan]]. */ -abstract class HoodieBaseRelation( - val sqlContext: SQLContext, - metaClient: HoodieTableMetaClient, - optParams: Map[String, String], - userSchema: Option[StructType]) - extends BaseRelation with PrunedFilteredScan with Logging{ +abstract class HoodieBaseRelation(val sqlContext: SQLContext, + metaClient: HoodieTableMetaClient, + optParams: Map[String, String], + userSchema: Option[StructType]) + extends BaseRelation with PrunedFilteredScan with Logging { protected val sparkSession: SparkSession = sqlContext.sparkSession + protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration) + protected lazy val jobConf = new JobConf(conf) + + // If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one + // NOTE: This is historical behavior which is preserved as is + protected lazy val recordKeyField: String = + if (metaClient.getTableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD + else metaClient.getTableConfig.getRecordKeyFieldProp + + protected lazy val preCombineFieldOpt: Option[String] = getPrecombineFieldProperty + + /** + * @VisibleInTests + */ + lazy val mandatoryColumns: Seq[String] = { + if (isMetadataTable(metaClient)) { + Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE) + } else { + Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) + } + } + + protected lazy val specifiedQueryInstant: Option[String] = + optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) + .map(HoodieSqlCommonUtils.formatQueryInstant) + protected lazy val tableAvroSchema: Schema = { val schemaUtil = new TableSchemaResolver(metaClient) Try(schemaUtil.getTableAvroSchema).getOrElse( @@ -81,6 +113,34 @@ abstract class HoodieBaseRelation( } override def schema: StructType = tableStructSchema + + /** + * This method controls whether relation will be producing + *
      + *
    • [[Row]], when it's being equal to true
    • + *
    • [[InternalRow]], when it's being equal to false
    • + *
    + * + * Returning [[InternalRow]] directly enables us to save on needless ser/de loop from [[InternalRow]] (being + * produced by file-reader) to [[Row]] and back + */ + override final def needConversion: Boolean = false + + /** + * NOTE: DO NOT OVERRIDE THIS METHOD + */ + override final def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + // Here we rely on a type erasure, to workaround inherited API restriction and pass [[RDD[InternalRow]]] back as [[RDD[Row]]] + // Please check [[needConversion]] scala-doc for more details + doBuildScan(requiredColumns, filters).asInstanceOf[RDD[Row]] + } + + protected def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] + + protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { + val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) + requestedColumns ++ missing + } } object HoodieBaseRelation { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index fb12549f6..40299cfdc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, SpecificInternalRow, SubqueryExpression, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.sources.Filter @@ -33,43 +33,6 @@ import scala.collection.JavaConverters._ object HoodieDataSourceHelper extends PredicateHelper { - /** - * Partition the given condition into two sequence of conjunctive predicates: - * - predicates that can be evaluated using metadata only. - * - other predicates. - */ - def splitPartitionAndDataPredicates( - spark: SparkSession, - condition: Expression, - partitionColumns: Seq[String]): (Seq[Expression], Seq[Expression]) = { - splitConjunctivePredicates(condition).partition( - isPredicateMetadataOnly(spark, _, partitionColumns)) - } - - /** - * Check if condition can be evaluated using only metadata. In Delta, this means the condition - * only references partition columns and involves no subquery. - */ - def isPredicateMetadataOnly( - spark: SparkSession, - condition: Expression, - partitionColumns: Seq[String]): Boolean = { - isPredicatePartitionColumnsOnly(spark, condition, partitionColumns) && - !SubqueryExpression.hasSubquery(condition) - } - - /** - * Does the predicate only contains partition columns? - */ - def isPredicatePartitionColumnsOnly( - spark: SparkSession, - condition: Expression, - partitionColumns: Seq[String]): Boolean = { - val nameEquality = spark.sessionState.analyzer.resolver - condition.references.forall { r => - partitionColumns.exists(nameEquality(r.name, _)) - } - } /** * Wrapper `buildReaderWithPartitionValues` of [[ParquetFileFormat]] diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala index 9f2d7d9e0..7e8f62bd2 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileScanRDD.scala @@ -18,56 +18,37 @@ package org.apache.hudi -import org.apache.spark.{Partition, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, SchemaColumnConvertNotSupportedException} -import org.apache.spark.sql.types.StructType +import org.apache.spark.{Partition, TaskContext} /** - * Similar to [[org.apache.spark.sql.execution.datasources.FileScanRDD]]. - * - * This class will extract the fields needed according to [[requiredColumns]] and - * return iterator of [[org.apache.spark.sql.Row]] directly. + * TODO eval if we actually need it */ -class HoodieFileScanRDD( - @transient private val sparkSession: SparkSession, - requiredColumns: Array[String], - schema: StructType, - readFunction: PartitionedFile => Iterator[InternalRow], - @transient val filePartitions: Seq[FilePartition]) - extends RDD[Row](sparkSession.sparkContext, Nil) { - - private val requiredSchema = { - val nameToStructField = schema.map(field => (field.name, field)).toMap - StructType(requiredColumns.map(nameToStructField)) - } - - private val requiredFieldPos = HoodieSparkUtils.collectFieldIndexes(requiredSchema, schema) - - override def compute(split: Partition, context: TaskContext): Iterator[Row] = { - val iterator = new Iterator[Object] with AutoCloseable { +class HoodieFileScanRDD(@transient private val sparkSession: SparkSession, + readFunction: PartitionedFile => Iterator[InternalRow], + @transient fileSplits: Seq[FilePartition]) + extends HoodieUnsafeRDD(sparkSession.sparkContext) { + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val iterator = new Iterator[InternalRow] with AutoCloseable { private[this] val files = split.asInstanceOf[FilePartition].files.toIterator - private[this] var currentFile: PartitionedFile = null - private[this] var currentIterator: Iterator[Object] = null + private[this] var currentFile: PartitionedFile = _ + private[this] var currentIterator: Iterator[InternalRow] = _ override def hasNext: Boolean = { (currentIterator != null && currentIterator.hasNext) || nextIterator() } - def next(): Object = { - currentIterator.next() - } + def next(): InternalRow = currentIterator.next() /** Advances to the next file. Returns true if a new non-empty iterator is available. */ private def nextIterator(): Boolean = { if (files.hasNext) { - currentFile = files.next() - logInfo(s"Reading File $currentFile") + currentFile = files.next() currentIterator = readFunction(currentFile) try { @@ -93,17 +74,8 @@ class HoodieFileScanRDD( // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener[Unit](_ => iterator.close()) - // extract required columns from row - val iterAfterExtract = HoodieDataSourceHelper.extractRequiredSchema( - iterator.asInstanceOf[Iterator[InternalRow]], - requiredSchema, - requiredFieldPos) - - // convert InternalRow to Row and return - val converter = CatalystTypeConverters.createToScalaConverter(requiredSchema) - iterAfterExtract.map(converter(_).asInstanceOf[Row]) + iterator.asInstanceOf[Iterator[InternalRow]] } - override protected def getPartitions: Array[Partition] = filePartitions.toArray - + override protected def getPartitions: Array[Partition] = fileSplits.toArray } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 4a4ea2cdd..3a518da32 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -36,7 +36,6 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.config.HoodieRealtimeConfig import org.apache.hudi.metadata.HoodieTableMetadata.getDataTableBasePathFromMetadataTable import org.apache.hudi.metadata.{HoodieBackedTableMetadata, HoodieTableMetadata} -import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeProjection import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -54,10 +53,11 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, @transient config: Configuration, fullSchemaFileReader: PartitionedFile => Iterator[InternalRow], requiredSchemaFileReader: PartitionedFile => Iterator[InternalRow], - tableState: HoodieMergeOnReadTableState, + tableState: HoodieTableState, tableSchema: HoodieTableSchema, - requiredSchema: HoodieTableSchema) - extends RDD[InternalRow](sc, Nil) { + requiredSchema: HoodieTableSchema, + @transient fileSplits: List[HoodieMergeOnReadFileSplit]) + extends HoodieUnsafeRDD(sc) { private val confBroadcast = sc.broadcast(new SerializableWritable(config)) private val recordKeyField = tableState.recordKeyField @@ -98,12 +98,8 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, iter } - override protected def getPartitions: Array[Partition] = { - tableState - .hoodieRealtimeFileSplits - .zipWithIndex - .map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray - } + override protected def getPartitions: Array[Partition] = + fileSplits.zipWithIndex.map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray private def getConfig: Configuration = { val conf = confBroadcast.value.value diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala new file mode 100644 index 000000000..3f95746a5 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieUnsafeRDD.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.{Partition, SparkContext, TaskContext} + +/** + * !!! PLEASE READ CAREFULLY !!! + * + * Base class for all of the custom low-overhead RDD implementations for Hudi. + * + * To keep memory allocation footprint as low as possible, each inheritor of this RDD base class + * + *
    + *   1. Does NOT deserialize from [[InternalRow]] to [[Row]] (therefore only providing access to
    + *   Catalyst internal representations (often mutable) of the read row)
    + *
    + *   2. DOES NOT COPY UNDERLYING ROW OUT OF THE BOX, meaning that
    + *
    + *      a) access to this RDD is NOT thread-safe
    + *
    + *      b) iterating over it reference to a _mutable_ underlying instance (of [[InternalRow]]) is
    + *      returned, entailing that after [[Iterator#next()]] is invoked on the provided iterator,
    + *      previous reference becomes **invalid**. Therefore, you will have to copy underlying mutable
    + *      instance of [[InternalRow]] if you plan to access it after [[Iterator#next()]] is invoked (filling
    + *      it with the next row's payload)
    + *
    + *      c) due to item b) above, no operation other than the iteration will produce meaningful
    + *      results on it and will likely fail [1]
    + * 
    + * + * [1] For example, [[RDD#collect]] method on this implementation would not work correctly, as it's + * simply using Scala's default [[Iterator#toArray]] method which will simply concat all the references onto + * the same underlying mutable object into [[Array]]. Instead each individual [[InternalRow]] _has to be copied_, + * before concatenating into the final output. Please refer to [[HoodieRDDUtils#collect]] for more details. + * + * NOTE: It enforces, for ex, that all of the RDDs implement [[compute]] method returning + * [[InternalRow]] to avoid superfluous ser/de + */ +abstract class HoodieUnsafeRDD(@transient sc: SparkContext) + extends RDD[InternalRow](sc, Nil) { + + def compute(split: Partition, context: TaskContext): Iterator[InternalRow] + + override final def collect(): Array[InternalRow] = + throw new UnsupportedOperationException( + "This method will not function correctly, please refer to scala-doc for HoodieUnsafeRDD" + ) +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index b9d18c68d..8308e3b7e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -19,7 +19,6 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{GlobPattern, Path} -import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient @@ -28,11 +27,11 @@ import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.{getCommitMetadata, getWritePartitionPaths, listAffectedFilesForCommits} import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.PartitionedFile import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SQLContext} import scala.collection.JavaConversions._ @@ -47,9 +46,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, val metaClient: HoodieTableMetaClient) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { - private val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - private val jobConf = new JobConf(conf) - private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants() if (commitTimeline.empty()) { throw new HoodieException("No instants to incrementally pull") @@ -77,8 +73,6 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, private val fileIndex = if (commitsToReturn.isEmpty) List() else buildFileIndex() - private val preCombineFieldOpt = getPrecombineFieldProperty - // Record filters making sure that only records w/in the requested bounds are being fetched as part of the // scan collected by this relation private lazy val incrementalSpanRecordsFilters: Seq[Filter] = { @@ -88,18 +82,16 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, Seq(isNotNullFilter, largerThanFilter, lessThanFilter) } - private lazy val mandatoryColumns = { + override lazy val mandatoryColumns: Seq[String] = { // NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in // cases when no columns are requested to be fetched (for ex, when using {@code count()} API) Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) } - override def needConversion: Boolean = false - - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { if (fileIndex.isEmpty) { - sqlContext.sparkContext.emptyRDD[Row] + sqlContext.sparkContext.emptyRDD[InternalRow] } else { logDebug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") logDebug(s"buildScan filters = ${filters.mkString(",")}") @@ -148,20 +140,20 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, hadoopConf = new Configuration(conf) ) - val hoodieTableState = HoodieMergeOnReadTableState(fileIndex, HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt) + val hoodieTableState = HoodieTableState(HoodieRecord.RECORD_KEY_METADATA_FIELD, preCombineFieldOpt) // TODO implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately // filtered, since file-reader might not be capable to perform filtering - val rdd = new HoodieMergeOnReadRDD( + new HoodieMergeOnReadRDD( sqlContext.sparkContext, jobConf, fullSchemaParquetReader, requiredSchemaParquetReader, hoodieTableState, tableSchema, - requiredSchema + requiredSchema, + fileIndex ) - rdd.asInstanceOf[RDD[Row]] } } @@ -225,9 +217,4 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext, latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }) } - - private def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { - val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) - requestedColumns ++ missing - } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 7c1a3540c..6156054b4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -20,22 +20,19 @@ package org.apache.hudi import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred.JobConf -import org.apache.hudi.HoodieBaseRelation.{createBaseFileReader, isMetadataTable} +import org.apache.hudi.HoodieBaseRelation.createBaseFileReader import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecord} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes -import org.apache.hudi.metadata.HoodieMetadataPayload import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} -import org.apache.spark.sql.hudi.HoodieSqlCommonUtils import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SQLContext} import scala.collection.JavaConverters._ @@ -46,10 +43,6 @@ case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile], maxCompactionMemoryInBytes: Long, mergeType: String) -case class HoodieMergeOnReadTableState(hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit], - recordKeyField: String, - preCombineFieldOpt: Option[String]) - class MergeOnReadSnapshotRelation(sqlContext: SQLContext, optParams: Map[String, String], val userSchema: Option[StructType], @@ -57,38 +50,13 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val metaClient: HoodieTableMetaClient) extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema) { - private val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) - private val jobConf = new JobConf(conf) - private val mergeType = optParams.getOrElse( DataSourceReadOptions.REALTIME_MERGE.key, DataSourceReadOptions.REALTIME_MERGE.defaultValue) private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) - // If meta fields are enabled, always prefer key from the meta field as opposed to user-specified one - // NOTE: This is historical behavior which is preserved as is - private val recordKeyField = { - if (metaClient.getTableConfig.populateMetaFields()) HoodieRecord.RECORD_KEY_METADATA_FIELD - else metaClient.getTableConfig.getRecordKeyFieldProp - } - - private val preCombineFieldOpt = getPrecombineFieldProperty - - private lazy val mandatoryColumns = { - if (isMetadataTable(metaClient)) { - Seq(HoodieMetadataPayload.KEY_FIELD_NAME, HoodieMetadataPayload.SCHEMA_FIELD_NAME_TYPE) - } else { - Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) - } - } - - override def needConversion: Boolean = false - - private val specifiedQueryInstant = optParams.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) - .map(HoodieSqlCommonUtils.formatQueryInstant) - - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + override def doBuildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[InternalRow] = { log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}") log.debug(s" buildScan filters = ${filters.mkString(",")}") @@ -137,12 +105,10 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, hadoopConf = new Configuration(conf) ) - val tableState = HoodieMergeOnReadTableState(fileIndex, recordKeyField, preCombineFieldOpt) + val tableState = HoodieTableState(recordKeyField, preCombineFieldOpt) - val rdd = new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, - requiredSchemaParquetReader, tableState, tableSchema, requiredSchema) - - rdd.asInstanceOf[RDD[Row]] + new HoodieMergeOnReadRDD(sqlContext.sparkContext, jobConf, fullSchemaParquetReader, + requiredSchemaParquetReader, tableState, tableSchema, requiredSchema, fileIndex) } def buildFileIndex(filters: Array[Filter]): List[HoodieMergeOnReadFileSplit] = { @@ -193,7 +159,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, val partitionColumns = hoodieFileIndex.partitionSchema.fieldNames.toSet val partitionFilters = filters.filter(f => f.references.forall(p => partitionColumns.contains(p))) val partitionFilterExpression = - HoodieSparkUtils.convertToCatalystExpressions(partitionFilters, tableStructSchema) + HoodieSparkUtils.convertToCatalystExpression(partitionFilters, tableStructSchema) val convertedPartitionFilterExpression = HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, partitionFilterExpression.toSeq) @@ -231,11 +197,6 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, } } } - - private def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = { - val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col)) - requestedColumns ++ missing - } } object MergeOnReadSnapshotRelation { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala new file mode 100644 index 000000000..1ac8fa098 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieUnsafeRDDUtils.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.hudi.HoodieUnsafeRDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.util.MutablePair + +/** + * Suite of utilities helping in handling instances of [[HoodieUnsafeRDD]] + */ +object HoodieUnsafeRDDUtils { + + /** + * Canonical implementation of the [[RDD#collect]] for [[HoodieUnsafeRDD]], returning a properly + * copied [[Array]] of [[InternalRow]]s + */ + def collect(rdd: HoodieUnsafeRDD): Array[InternalRow] = { + rdd.mapPartitionsInternal { iter => + // NOTE: We're leveraging [[MutablePair]] here to avoid unnecessary allocations, since + // a) iteration is performed lazily and b) iteration is single-threaded (w/in partition) + val pair = new MutablePair[InternalRow, Null]() + iter.map(row => pair.update(row.copy(), null)) + } + .map(p => p._1) + .collect() + } + +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala similarity index 89% rename from hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala rename to hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala index 050efbd3d..4a3a7c452 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieAvroSerializer.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSerializer.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.avro import org.apache.avro.Schema import org.apache.spark.sql.types.DataType -class HoodieAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) - extends HoodieAvroSerializerTrait { +class HoodieSparkAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean) + extends HoodieAvroSerializer { val avroSerializer = new AvroSerializer(rootCatalystType, rootAvroType, nullable) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala index 9b1b88d34..8aa47ffc2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestConvertFilterToCatalystExpression.scala @@ -17,11 +17,9 @@ package org.apache.hudi -import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpressions import org.apache.hudi.HoodieSparkUtils.convertToCatalystExpression - -import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, GreaterThan, GreaterThanOrEqual, In, IsNotNull, IsNull, LessThan, LessThanOrEqual, Not, Or, StringContains, StringEndsWith, StringStartsWith} -import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -93,7 +91,7 @@ class TestConvertFilterToCatalystExpression { } else { expectExpression } - val exp = convertToCatalystExpressions(filters, tableSchema) + val exp = convertToCatalystExpression(filters, tableSchema) if (removeQuotesIfNeed == null) { assertEquals(exp.isEmpty, true) } else { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala index 315a14c9d..18b639f2f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala @@ -26,6 +26,7 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.log4j.LogManager import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, lit} import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} @@ -39,6 +40,8 @@ import scala.collection.JavaConversions._ @Tag("functional") class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness { + private val log = LogManager.getLogger(classOf[TestMORDataSourceStorage]) + val commonOpts = Map( "hoodie.insert.shuffle.parallelism" -> "4", "hoodie.upsert.shuffle.parallelism" -> "4", diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala new file mode 100644 index 000000000..a96308174 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional + +import org.apache.avro.Schema +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD} +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator} +import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig} +import org.apache.hudi.keygen.NonpartitionedKeyGenerator +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness +import org.apache.parquet.hadoop.util.counters.BenchmarkCounter +import org.apache.spark.HoodieUnsafeRDDUtils +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Dataset, Row, SaveMode} +import org.apache.spark.sql.catalyst.InternalRow +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Test} + +import scala.:+ +import scala.collection.JavaConverters._ + +@Tag("functional") +class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with Logging { + + val defaultWriteOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieMetadataConfig.ENABLE.key -> "true", + // NOTE: It's critical that we use non-partitioned table, since the way we track amount of bytes read + // is not robust, and works most reliably only when we read just a single file. As such, making table + // non-partitioned makes it much more likely just a single file will be written + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName + ) + + @Test + def testBaseFileOnlyViewRelation(): Unit = { + val tablePath = s"$basePath/cow" + val targetRecordsCount = 100 + val (_, schema) = bootstrapTable(tablePath, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, targetRecordsCount, + defaultWriteOpts, populateMetaFields = true) + val tableState = TableState(tablePath, schema, targetRecordsCount, 0.0) + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) + val projectedColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 2452), + ("rider,driver", 2552), + ("rider,driver,tip_history", 3517)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 2595), + ("rider,driver", 2735), + ("rider,driver,tip_history", 3750)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Test COW / Snapshot + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, "", projectedColumnsReadStats) + } + + @Test + def testMergeOnReadSnapshotRelationWithDeltaLogs(): Unit = { + val tablePath = s"$basePath/mor-with-logs" + val targetRecordsCount = 100 + val targetUpdatedRecordsRatio = 0.5 + + val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, targetUpdatedRecordsRatio, defaultWriteOpts, populateMetaFields = true) + val tableState = TableState(tablePath, schema, targetRecordsCount, targetUpdatedRecordsRatio) + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) + val projectedColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 2452), + ("rider,driver", 2552), + ("rider,driver,tip_history", 3517)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 2595), + ("rider,driver", 2735), + ("rider,driver,tip_history", 3750)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Stats for the reads fetching _all_ columns (note, how amount of bytes read + // is invariant of the # of columns) + val fullColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 14665), + ("rider,driver", 14665), + ("rider,driver,tip_history", 14665)) + else if (HoodieSparkUtils.isSpark2) + // TODO re-enable tests (these tests are very unstable currently) + Array( + ("rider", -1), + ("rider,driver", -1), + ("rider,driver,tip_history", -1)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Test MOR / Snapshot / Skip-merge + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats) + + // Test MOR / Snapshot / Payload-combine + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, fullColumnsReadStats) + + // Test MOR / Read Optimized + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats) + } + + @Test + def testMergeOnReadSnapshotRelationWithNoDeltaLogs(): Unit = { + val tablePath = s"$basePath/mor-no-logs" + val targetRecordsCount = 100 + val targetUpdatedRecordsRatio = 0.0 + + val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, targetUpdatedRecordsRatio, defaultWriteOpts, populateMetaFields = true) + val tableState = TableState(tablePath, schema, targetRecordsCount, targetUpdatedRecordsRatio) + + // + // Test #1: MOR table w/ Delta Logs + // + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) + val projectedColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 2452), + ("rider,driver", 2552), + ("rider,driver,tip_history", 3517)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 2595), + ("rider,driver", 2735), + ("rider,driver,tip_history", 3750)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Test MOR / Snapshot / Skip-merge + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, projectedColumnsReadStats) + + // Test MOR / Snapshot / Payload-combine + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, projectedColumnsReadStats) + + // Test MOR / Read Optimized + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, "null", projectedColumnsReadStats) + } + + // TODO add test for incremental query of the table with logs + @Test + def testMergeOnReadIncrementalRelationWithNoDeltaLogs(): Unit = { + val tablePath = s"$basePath/mor-no-logs" + val targetRecordsCount = 100 + val targetUpdatedRecordsRatio = 0.0 + + val opts: Map[String, String] = + // NOTE: Parquet Compression is disabled as it was leading to non-deterministic outcomes when testing + // against Spark 2.x + defaultWriteOpts ++ Seq(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key -> "") + + val (_, schema) = bootstrapMORTable(tablePath, targetRecordsCount, targetUpdatedRecordsRatio, opts, populateMetaFields = true) + val tableState = TableState(tablePath, schema, targetRecordsCount, targetUpdatedRecordsRatio) + + // Stats for the reads fetching only _projected_ columns (note how amount of bytes read + // increases along w/ the # of columns) + val projectedColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 4219), + ("rider,driver", 4279), + ("rider,driver,tip_history", 5186)) + else if (HoodieSparkUtils.isSpark2) + Array( + ("rider", 4430), + ("rider,driver", 4530), + ("rider,driver,tip_history", 5487)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + // Stats for the reads fetching _all_ columns (note, how amount of bytes read + // is invariant of the # of columns) + val fullColumnsReadStats: Array[(String, Long)] = + if (HoodieSparkUtils.isSpark3) + Array( + ("rider", 19683), + ("rider,driver", 19683), + ("rider,driver,tip_history", 19683)) + else if (HoodieSparkUtils.isSpark2) + // TODO re-enable tests (these tests are very unstable currently) + Array( + ("rider", -1), + ("rider,driver", -1), + ("rider,driver,tip_history", -1)) + else + fail("Only Spark 3 and Spark 2 are currently supported") + + val incrementalOpts: Map[String, String] = Map( + DataSourceReadOptions.BEGIN_INSTANTTIME.key -> "001" + ) + + // Test MOR / Incremental / Skip-merge + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL, + projectedColumnsReadStats, incrementalOpts) + + // Test MOR / Incremental / Payload-combine + runTest(tableState, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL, + fullColumnsReadStats, incrementalOpts) + } + + + // Test routine + private def runTest(tableState: TableState, + queryType: String, + mergeType: String, + expectedStats: Array[(String, Long)], + additionalOpts: Map[String, String] = Map.empty): Unit = { + val tablePath = tableState.path + val readOpts = defaultWriteOpts ++ Map( + "path" -> tablePath, + DataSourceReadOptions.QUERY_TYPE.key -> queryType, + DataSourceReadOptions.REALTIME_MERGE.key -> mergeType + ) ++ additionalOpts + + val ds = new DefaultSource() + val relation: HoodieBaseRelation = ds.createRelation(spark.sqlContext, readOpts).asInstanceOf[HoodieBaseRelation] + + for ((columnListStr, expectedBytesRead) <- expectedStats) { + val targetColumns = columnListStr.split(",") + + println(s"Running test for $tablePath / $queryType / $mergeType / $columnListStr") + + val (rows, bytesRead) = measureBytesRead { () => + val rdd = relation.buildScan(targetColumns, Array.empty).asInstanceOf[HoodieUnsafeRDD] + HoodieUnsafeRDDUtils.collect(rdd) + } + + val targetRecordCount = tableState.targetRecordCount; + val targetUpdatedRecordsRatio = tableState.targetUpdatedRecordsRatio + + val expectedRecordCount = + if (DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL.equals(mergeType)) targetRecordCount * (1 + targetUpdatedRecordsRatio) + else targetRecordCount + + assertEquals(expectedRecordCount, rows.length) + if (expectedBytesRead != -1) { + assertEquals(expectedBytesRead, bytesRead) + } else { + logWarning(s"Not matching bytes read ($bytesRead)") + } + + val readColumns = targetColumns ++ relation.mandatoryColumns + val (_, projectedStructType) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns) + + val row: InternalRow = rows.take(1).head + + // This check is mostly about making sure InternalRow deserializes properly into projected schema + val deserializedColumns = row.toSeq(projectedStructType) + assertEquals(readColumns.length, deserializedColumns.size) + } + } + + private def bootstrapTable(path: String, + tableType: String, + recordCount: Int, + opts: Map[String, String], + populateMetaFields: Boolean, + dataGenOpt: Option[HoodieTestDataGenerator] = None): (List[HoodieRecord[_]], Schema) = { + val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345)) + + // Bulk Insert Operation + val schema = + if (populateMetaFields) HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS + else HoodieTestDataGenerator.AVRO_SCHEMA + + val records = dataGen.generateInserts("001", recordCount) + val inputDF: Dataset[Row] = toDataset(records, HoodieTestDataGenerator.AVRO_SCHEMA) + + inputDF.write.format("org.apache.hudi") + .options(opts) + .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Overwrite) + .save(path) + + (records.asScala.toList, schema) + } + + private def bootstrapMORTable(path: String, + recordCount: Int, + updatedRecordsRatio: Double, + opts: Map[String, String], + populateMetaFields: Boolean, + dataGenOpt: Option[HoodieTestDataGenerator] = None): (List[HoodieRecord[_]], Schema) = { + val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345)) + + // Step 1: Bootstrap table w/ N records (t/h bulk-insert) + val (insertedRecords, schema) = bootstrapTable(path, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, recordCount, opts, populateMetaFields, Some(dataGen)) + + if (updatedRecordsRatio == 0) { + (insertedRecords, schema) + } else { + val updatesCount = (insertedRecords.length * updatedRecordsRatio).toInt + val recordsToUpdate = insertedRecords.take(updatesCount) + val updatedRecords = dataGen.generateUpdates("002", recordsToUpdate.asJava) + + // Step 2: Update M records out of those (t/h update) + val inputDF = toDataset(updatedRecords, HoodieTestDataGenerator.AVRO_SCHEMA) + + inputDF.write.format("org.apache.hudi") + .options(opts) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) + .mode(SaveMode.Append) + .save(path) + + (updatedRecords.asScala.toList ++ insertedRecords.drop(updatesCount), schema) + } + } + + def measureBytesRead[T](f: () => T): (T, Int) = { + // Init BenchmarkCounter to report number of bytes actually read from the Block + BenchmarkCounter.initCounterFromReporter(HadoopMapRedUtils.createTestReporter, fs.getConf) + val r = f.apply() + val bytesRead = BenchmarkCounter.getBytesRead.toInt + (r, bytesRead) + } + + case class TableState(path: String, schema: Schema, targetRecordCount: Long, targetUpdatedRecordsRatio: Double) +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index f7fa2335f..54c8b912a 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema import org.apache.hudi.Spark2RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe -import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, HoodieAvroSerializerTrait, Spark2HoodieAvroDeserializer, HoodieAvroSerializer} +import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark2AvroDeserializer, HoodieSparkAvroSerializer} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} @@ -42,11 +42,11 @@ import scala.collection.mutable.ArrayBuffer */ class Spark2Adapter extends SparkAdapter { - def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializerTrait = - new HoodieAvroSerializer(rootCatalystType, rootAvroType, nullable) + def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = + new HoodieSparkAvroSerializer(rootCatalystType, rootAvroType, nullable) - def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializerTrait = - new Spark2HoodieAvroDeserializer(rootAvroType, rootCatalystType) + def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = + new HoodieSpark2AvroDeserializer(rootAvroType, rootCatalystType) override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = { new Spark2RowSerDe(encoder) diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala similarity index 76% rename from hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala rename to hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala index ac2c82f70..2b55c6695 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/Spark2HoodieAvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/HoodieSpark2AvroDeserializer.scala @@ -21,13 +21,15 @@ import org.apache.avro.Schema import org.apache.spark.sql.types.DataType /** - * This is Spark 2 implementation for the [[HoodieAvroDeserializerTrait]] leveraging [[PatchedAvroDeserializer]], + * This is Spark 2 implementation for the [[HoodieAvroDeserializer]] leveraging [[PatchedAvroDeserializer]], * which is just copied over version of [[AvroDeserializer]] from Spark 2.4.4 w/ SPARK-30267 being back-ported to it */ -class Spark2HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) - extends HoodieAvroDeserializerTrait { +class HoodieSpark2AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) + extends HoodieAvroDeserializer { private val avroDeserializer = new PatchedAvroDeserializer(rootAvroType, rootCatalystType) - def doDeserialize(data: Any): Any = avroDeserializer.deserialize(data) + // As of Spark 3.1, this will return data wrapped with Option, so we make sure these interfaces + // are aligned across Spark versions + def deserialize(data: Any): Option[Any] = Some(avroDeserializer.deserialize(data)) } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala index d0328fb4d..ad338323e 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/Spark3Adapter.scala @@ -21,7 +21,7 @@ import org.apache.avro.Schema import org.apache.hudi.Spark3RowSerDe import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.spark3.internal.ReflectUtil -import org.apache.spark.sql.avro.{HoodieAvroDeserializerTrait, HoodieAvroSerializerTrait, Spark3HoodieAvroDeserializer, HoodieAvroSerializer} +import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSerializer, HoodieSpark3AvroDeserializer, HoodieSparkAvroSerializer} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Like} @@ -43,11 +43,11 @@ import org.apache.spark.sql.{Row, SparkSession} */ class Spark3Adapter extends SparkAdapter { - def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializerTrait = - new HoodieAvroSerializer(rootCatalystType, rootAvroType, nullable) + def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: Boolean): HoodieAvroSerializer = + new HoodieSparkAvroSerializer(rootCatalystType, rootAvroType, nullable) - def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializerTrait = - new Spark3HoodieAvroDeserializer(rootAvroType, rootCatalystType) + def createAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType): HoodieAvroDeserializer = + new HoodieSpark3AvroDeserializer(rootAvroType, rootCatalystType) override def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe = { new Spark3RowSerDe(encoder) diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala similarity index 89% rename from hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala rename to hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala index fa03f5d84..bd9ead5a7 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/Spark3HoodieAvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/avro/HoodieSpark3AvroDeserializer.scala @@ -21,8 +21,8 @@ import org.apache.avro.Schema import org.apache.hudi.HoodieSparkUtils import org.apache.spark.sql.types.DataType -class Spark3HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) - extends HoodieAvroDeserializerTrait { +class HoodieSpark3AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) + extends HoodieAvroDeserializer { // SPARK-34404: As of Spark3.2, there is no AvroDeserializer's constructor with Schema and DataType arguments. // So use the reflection to get AvroDeserializer instance. @@ -34,5 +34,5 @@ class Spark3HoodieAvroDeserializer(rootAvroType: Schema, rootCatalystType: DataT constructor.newInstance(rootAvroType, rootCatalystType) } - def doDeserialize(data: Any): Any = avroDeserializer.deserialize(data) + def deserialize(data: Any): Option[Any] = avroDeserializer.deserialize(data) } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java index a57be6246..9a62c14e5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java @@ -249,14 +249,17 @@ public class TestHDFSParquetImporter extends FunctionalTestHarness implements Se long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000; List records = new ArrayList(); // 10 for update + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); for (long recordNum = 0; recordNum < 11; recordNum++) { - records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, - "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); + records.add( + dataGen.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, + "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); } // 4 for insert for (long recordNum = 96; recordNum < 100; recordNum++) { - records.add(new HoodieTestDataGenerator().generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, - "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); + records.add( + dataGen.generateGenericRecord(Long.toString(recordNum), "0", "rider-upsert-" + recordNum, + "driver-upsert" + recordNum, startTime + TimeUnit.HOURS.toSeconds(recordNum))); } try (ParquetWriter writer = AvroParquetWriter.builder(srcFile) .withSchema(HoodieTestDataGenerator.AVRO_SCHEMA).withConf(HoodieTestUtils.getDefaultHadoopConf()).build()) {